# -*- coding: utf-8 -*- # @Time : 2024/6/7 # @Author : 魏志亮 import traceback from os import * import pandas as pd from service.common_connect import trans from service.trans_conf_service import create_wave_table from utils.file.trans_methods import split_array from utils.log.trans_log import trans_print def get_min_sec_conf(field_code, trans_type) -> dict: query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1" res = trans.execute(query_sql, (field_code, trans_type)) if type(res) == tuple or type(res) == str: return None return res[0] def get_min_sec_conf_test(field_code, trans_type) -> dict: query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s and status = 1" res = trans.execute(query_sql, (field_code, trans_type)) print(res) if type(res) == tuple or type(res) == str: return None return res[0] def get_fault_warn_conf(field_code, trans_type) -> dict: types = list() if trans_type == 'fault': types.append(1) elif trans_type == 'warn': types.append(2) else: trans_print(f"未找到{trans_type}告警/故障的配置") raise ValueError(f"未找到{trans_type}告警/故障的配置") types.append(3) query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1" res = trans.execute(query_sql, (field_code, types)) print(res) if type(res) == tuple or type(res) == str: return None return res[0] def get_wave_conf(field_code) -> dict: query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1" res = trans.execute(query_sql, (field_code)) print(res) if type(res) == tuple or type(res) == str: return None return res[0] def creat_min_sec_table(table_name, trans_type, use_tidb=False): exists_table_sql = f""" select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}' """ count = trans.execute(exists_table_sql)[0]['count'] if count > 0: trans_print(f"{table_name}已存在") if trans_type == 'second': add_key = 'KEY `year_month` (`year_month`)' key = '`year_month`' else: add_key = 'KEY `year` (`year`)' key = '`year`' if count == 0: create_sql = f""" CREATE TABLE IF NOT EXISTS `{table_name}` ( `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号', `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称', `time_stamp` datetime NOT NULL COMMENT '时间戳', `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率', `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速', `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速', `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速', `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1', `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2', `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3', `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置', `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向', `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度', `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值', `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温', `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度', `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度', `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度', `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度', `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动', `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动', `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩', `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩', `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数', `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数', `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用', `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用', `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速', `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)', `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度', `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度', `main_bearing_temperature_2` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度2', `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度', `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度', `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度', `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度', `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度', `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度', `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1', `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2', `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度', `grid_a_phase_current` DOUBLE DEFAULT NULL COMMENT '电网A相电流', `grid_b_phase_current` DOUBLE DEFAULT NULL COMMENT '电网B相电流', `grid_c_phase_current` DOUBLE DEFAULT NULL COMMENT '电网C相电流', `reactive_power` DOUBLE DEFAULT NULL COMMENT '无功功率', `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点 1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电', `year` INT (4) DEFAULT NULL COMMENT '年', `month` INT (2) DEFAULT NULL COMMENT '月', `day` INT (2) DEFAULT NULL COMMENT '日', `year_month` int(6) DEFAULT NULL COMMENT '年-月', `param1` DOUBLE DEFAULT NULL COMMENT '预留1', `param2` DOUBLE DEFAULT NULL COMMENT '预留2', `param3` DOUBLE DEFAULT NULL COMMENT '预留3', `param4` DOUBLE DEFAULT NULL COMMENT '预留4', `param5` DOUBLE DEFAULT NULL COMMENT '预留5', `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6', `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7', `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8', `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9', `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10', KEY `time_stamp` (`time_stamp`), KEY `wind_turbine_number` (`wind_turbine_number`), {add_key} ) """ # if not use_tidb: create_sql = create_sql + f""" PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) ( PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number')) ) """ trans.execute(create_sql) def add_partation(table_name: str, date_str: str, wind_turbine_number): p_name = f'p{date_str}_{wind_turbine_number}' add_sql = f""" alter table {table_name} add partition ( partition {p_name} VALUES IN (({date_str}, '{wind_turbine_number}')) ) """ trans.execute(add_sql) def remove_partation(table_name: str, date_str: str, wind_turbine_number): p_name = f'p{date_str}_{wind_turbine_number}' remove_sql = f""" alter table {table_name} DROP PARTITION {p_name} """ trans.execute(remove_sql) def add_or_remove_partation(table_name: str, date_str: str, wind_turbine_number): p_name = f'p{date_str}_{wind_turbine_number}' query_partation = f""" SELECT count(1) as count from information_schema.`PARTITIONS` t where t.TABLE_SCHEMA = '{trans.database}' and t.TABLE_NAME = '{table_name}' and t.PARTITION_NAME = '{p_name}' """ count = trans.execute(query_partation)[0]['count'] if count == 0: add_partation(table_name, date_str, wind_turbine_number) else: remove_partation(table_name, date_str, wind_turbine_number) add_partation(table_name, date_str, wind_turbine_number) def drop_exists_data(table_name, wind_turbine_number, min_date, max_date): # sql = f"# delete from {table_name} where wind_turbine_number = '{wind_turbine_number}' and time_stamp between '{min_date}' and '{max_date}'" sql = f""" BATCH ON `time_stamp`, `wind_turbine_number` LIMIT 1000 DELETE FROM `{table_name}` WHERE `rated_at` >= "{min_date}" AND `rated_at` <= "{max_date}" AND `wind_turbine_number` = "{wind_turbine_number}"; """ count = trans.execute(sql) trans_print(f"删除数据{count}条,{table_name},{wind_turbine_number},{min_date},{max_date}") def save_scada_file_to_db(table_name, file: str, wind_turbine_number, date_str, batch_count=100000, use_tidb=False): base_name = path.basename(file) df = pd.read_csv(file) # if use_tidb: # min_date = df['time_stamp'].min() # max_date = df['time_stamp'].max() # # drop_exists_data(table_name, wind_turbine_number, min_date, max_date) # else: # add_or_remove_partation(table_name, date_str, wind_turbine_number) add_or_remove_partation(table_name, date_str, wind_turbine_number) try: trans_print(f"保存{table_name},{base_name},{wind_turbine_number},数据:{df.shape[0]}") trans.execute_df_save(df, table_name, batch_count) trans_print(f"保存到{table_name},{base_name},{wind_turbine_number} 成功,总条数:{df.shape[0]}") except Exception as e: trans_print(traceback.format_exc()) message = base_name + str(e) raise Exception(message) def save_file_to_db(table_name: str, file: str, batch_count=100000): base_name = path.basename(file) try: df = pd.read_csv(file) trans_print(f"保存{table_name},总条数:{df.shape[0]}") trans.execute_df_save(df, table_name, batch_count) trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}") except Exception as e: trans_print(traceback.format_exc()) message = base_name + str(e) raise Exception(message) def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000): try: trans_print(f"保存{table_name},总条数:{df.shape[0]}") trans.execute_df_save(df, table_name, batch_count) trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}") except Exception as e: trans_print(traceback.format_exc()) raise Exception(str(e)) def batch_statistics(table_name): query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t " try: res = trans.execute(query_sql) return res[0] except: trans_print(traceback.format_exc()) return None def create_warn_fault_table(table_name): sql = f""" CREATE TABLE `{table_name}` ( `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号', `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号', `begin_time` datetime DEFAULT NULL COMMENT '开始时间', `end_time` datetime DEFAULT NULL COMMENT '结束时间', `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒', `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID', `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE', `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述', `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级', `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型', `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态', KEY `wind_turbine_number` (`wind_turbine_number`), KEY `begin_time` (`begin_time`), KEY `end_time` (`end_time`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 """ trans.execute(sql) def drop_table(table_name): drop_sql = f"DROP TABLE `{table_name}`" try: trans.execute(drop_sql) except: pass def get_or_create_wave_table(table_name): create_table = False query_sql = f"select 1 from `{table_name}` limit 1" try: trans.execute(query_sql) except: create_table = True if create_table: create_wave_table(table_name) def get_wave_data(table_name, min_data, max_data): query_sql = f""" select id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}' """ return trans.read_sql_to_df(query_sql) def delete_exist_wave_data(table_name, ids): all_arrays = split_array(ids, 1000) for array in all_arrays: ids_str = ",".join(['%s'] * len(array)) delete_sql = f"delete from `{table_name}` where id in ({ids_str})" trans.execute(delete_sql, array) def get_trans_exec_code(id, query_type): query_sql = f"SELECT * from batch_exec_code t where t.id = '{id}' and type='{query_type}' and t.`status` = 1 limit 1" res = trans.execute(query_sql) if type(res) == tuple or type(res) == str: return None exec_code = res[0]['exec_code'] trans_print("任务ID", id, '类型', type, '获取到执行代码:', exec_code) return exec_code if __name__ == '__main__': delete_exist_wave_data('SKF001_wave', [1, 2, 3])