123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/6/7
- # @Author : 魏志亮
- import traceback
- from os import *
- import pandas as pd
- from service.common_connect import trans
- from service.trans_conf_service import create_wave_table
- from utils.file.trans_methods import split_array
- from utils.log.trans_log import trans_print
- def get_min_sec_conf(field_code, trans_type) -> dict:
- query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
- res = trans.execute(query_sql, (field_code, trans_type))
- if type(res) == tuple or type(res) == str:
- return None
- return res[0]
- def get_min_sec_conf_test(field_code, trans_type) -> dict:
- query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s and status = 1"
- res = trans.execute(query_sql, (field_code, trans_type))
- print(res)
- if type(res) == tuple or type(res) == str:
- return None
- return res[0]
- def get_fault_warn_conf(field_code, trans_type) -> dict:
- types = list()
- if trans_type == 'fault':
- types.append(1)
- elif trans_type == 'warn':
- types.append(2)
- else:
- trans_print(f"未找到{trans_type}告警/故障的配置")
- raise ValueError(f"未找到{trans_type}告警/故障的配置")
- types.append(3)
- query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
- res = trans.execute(query_sql, (field_code, types))
- print(res)
- if type(res) == tuple or type(res) == str:
- return None
- return res[0]
- def get_wave_conf(field_code) -> dict:
- query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1"
- res = trans.execute(query_sql, (field_code))
- print(res)
- if type(res) == tuple or type(res) == str:
- return None
- return res[0]
- def creat_min_sec_table(table_name, trans_type, use_tidb=False):
- exists_table_sql = f"""
- select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
- """
- count = trans.execute(exists_table_sql)[0]['count']
- if count > 0:
- trans_print(f"{table_name}已存在")
- if trans_type == 'second':
- add_key = 'KEY `year_month` (`year_month`)'
- key = '`year_month`'
- else:
- add_key = 'KEY `year` (`year`)'
- key = '`year`'
- if count == 0:
- create_sql = f"""
- CREATE TABLE
- IF NOT EXISTS `{table_name}` (
- `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
- `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
- `time_stamp` datetime NOT NULL COMMENT '时间戳',
- `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
- `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
- `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
- `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
- `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
- `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
- `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
- `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
- `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
- `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
- `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
- `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
- `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
- `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
- `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
- `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
- `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
- `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
- `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
- `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
- `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
- `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
- `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
- `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
- `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
- `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
- `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
- `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
- `main_bearing_temperature_2` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度2',
- `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
- `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
- `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
- `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
- `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
- `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
- `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
- `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
- `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
- `grid_a_phase_current` DOUBLE DEFAULT NULL COMMENT '电网A相电流',
- `grid_b_phase_current` DOUBLE DEFAULT NULL COMMENT '电网B相电流',
- `grid_c_phase_current` DOUBLE DEFAULT NULL COMMENT '电网C相电流',
- `reactive_power` DOUBLE DEFAULT NULL COMMENT '无功功率',
- `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点 1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
- `year` INT (4) DEFAULT NULL COMMENT '年',
- `month` INT (2) DEFAULT NULL COMMENT '月',
- `day` INT (2) DEFAULT NULL COMMENT '日',
- `year_month` int(6) DEFAULT NULL COMMENT '年-月',
- `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
- `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
- `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
- `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
- `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
- `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
- `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
- `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
- `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
- `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
- KEY `time_stamp` (`time_stamp`),
- KEY `wind_turbine_number` (`wind_turbine_number`),
- {add_key}
- )
- """
- # if not use_tidb:
- create_sql = create_sql + f"""
- PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) (
- PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number'))
- )
- """
- trans.execute(create_sql)
- def add_partation(table_name: str, date_str: str, wind_turbine_number):
- p_name = f'p{date_str}_{wind_turbine_number}'
- add_sql = f"""
- alter table {table_name} add partition (
- partition {p_name} VALUES IN (({date_str}, '{wind_turbine_number}'))
- )
- """
- trans.execute(add_sql)
- def remove_partation(table_name: str, date_str: str, wind_turbine_number):
- p_name = f'p{date_str}_{wind_turbine_number}'
- remove_sql = f"""
- alter table {table_name} DROP PARTITION {p_name}
- """
- trans.execute(remove_sql)
- def add_or_remove_partation(table_name: str, date_str: str, wind_turbine_number):
- p_name = f'p{date_str}_{wind_turbine_number}'
- query_partation = f"""
- SELECT count(1) as count from information_schema.`PARTITIONS` t
- where t.TABLE_SCHEMA = '{trans.database}'
- and t.TABLE_NAME = '{table_name}'
- and t.PARTITION_NAME = '{p_name}'
- """
- count = trans.execute(query_partation)[0]['count']
- if count == 0:
- add_partation(table_name, date_str, wind_turbine_number)
- else:
- remove_partation(table_name, date_str, wind_turbine_number)
- add_partation(table_name, date_str, wind_turbine_number)
- def drop_exists_data(table_name, wind_turbine_number, min_date, max_date):
- # sql = f"# delete from {table_name} where wind_turbine_number = '{wind_turbine_number}' and time_stamp between '{min_date}' and '{max_date}'"
- sql = f"""
- BATCH ON `time_stamp`, `wind_turbine_number` LIMIT 1000
- DELETE FROM `{table_name}`
- WHERE `rated_at` >= "{min_date}"
- AND `rated_at` <= "{max_date}"
- AND `wind_turbine_number` = "{wind_turbine_number}";
- """
- count = trans.execute(sql)
- trans_print(f"删除数据{count}条,{table_name},{wind_turbine_number},{min_date},{max_date}")
- def save_scada_file_to_db(table_name, file: str, wind_turbine_number, date_str, batch_count=100000, use_tidb=False):
- base_name = path.basename(file)
- df = pd.read_csv(file)
- # if use_tidb:
- # min_date = df['time_stamp'].min()
- # max_date = df['time_stamp'].max()
- # # drop_exists_data(table_name, wind_turbine_number, min_date, max_date)
- # else:
- # add_or_remove_partation(table_name, date_str, wind_turbine_number)
- add_or_remove_partation(table_name, date_str, wind_turbine_number)
- try:
- trans_print(f"保存{table_name},{base_name},{wind_turbine_number},数据:{df.shape[0]}")
- trans.execute_df_save(df, table_name, batch_count)
- trans_print(f"保存到{table_name},{base_name},{wind_turbine_number} 成功,总条数:{df.shape[0]}")
- except Exception as e:
- trans_print(traceback.format_exc())
- message = base_name + str(e)
- raise Exception(message)
- def save_file_to_db(table_name: str, file: str, batch_count=100000):
- base_name = path.basename(file)
- try:
- df = pd.read_csv(file)
- trans_print(f"保存{table_name},总条数:{df.shape[0]}")
- trans.execute_df_save(df, table_name, batch_count)
- trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}")
- except Exception as e:
- trans_print(traceback.format_exc())
- message = base_name + str(e)
- raise Exception(message)
- def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000):
- try:
- trans_print(f"保存{table_name},总条数:{df.shape[0]}")
- trans.execute_df_save(df, table_name, batch_count)
- trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}")
- except Exception as e:
- trans_print(traceback.format_exc())
- raise Exception(str(e))
- def batch_statistics(table_name):
- query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t "
- try:
- res = trans.execute(query_sql)
- return res[0]
- except:
- trans_print(traceback.format_exc())
- return None
- def create_warn_fault_table(table_name):
- sql = f"""
- CREATE TABLE `{table_name}` (
- `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
- `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
- `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
- `end_time` datetime DEFAULT NULL COMMENT '结束时间',
- `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
- `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
- `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
- `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
- `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
- `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
- `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
- KEY `wind_turbine_number` (`wind_turbine_number`),
- KEY `begin_time` (`begin_time`),
- KEY `end_time` (`end_time`)
- ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
- """
- trans.execute(sql)
- def drop_table(table_name):
- drop_sql = f"DROP TABLE `{table_name}`"
- try:
- trans.execute(drop_sql)
- except:
- pass
- def get_or_create_wave_table(table_name):
- create_table = False
- query_sql = f"select 1 from `{table_name}` limit 1"
- try:
- trans.execute(query_sql)
- except:
- create_table = True
- if create_table:
- create_wave_table(table_name)
- def get_wave_data(table_name, min_data, max_data):
- query_sql = f"""
- select id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}'
- """
- return trans.read_sql_to_df(query_sql)
- def delete_exist_wave_data(table_name, ids):
- all_arrays = split_array(ids, 1000)
- for array in all_arrays:
- ids_str = ",".join(['%s'] * len(array))
- delete_sql = f"delete from `{table_name}` where id in ({ids_str})"
- trans.execute(delete_sql, array)
- def get_trans_exec_code(id, query_type):
- query_sql = f"SELECT * from batch_exec_code t where t.id = '{id}' and type='{query_type}' and t.`status` = 1 limit 1"
- res = trans.execute(query_sql)
- if type(res) == tuple or type(res) == str:
- return None
- exec_code = res[0]['exec_code']
- trans_print("任务ID", id, '类型', type, '获取到执行代码:', exec_code)
- return exec_code
- if __name__ == '__main__':
- delete_exist_wave_data('SKF001_wave', [1, 2, 3])
|