123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/6/7
- # @Author : 魏志亮
- import traceback
- import pandas as pd
- from service.common_connect import trans
- from utils.log.trans_log import logger
- def create_tmp_table(table_name):
- create_sql = f"""
- CREATE TABLE `{table_name}` (
- `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
- `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '风机原始名称',
- `time_stamp` datetime NOT NULL COMMENT '时间戳',
- `active_power` double DEFAULT NULL COMMENT '有功功率',
- `rotor_speed` double DEFAULT NULL COMMENT '风轮转速',
- `generator_speed` double DEFAULT NULL COMMENT '发电机转速',
- `wind_velocity` double DEFAULT NULL COMMENT '风速',
- `pitch_angle_blade_1` double DEFAULT NULL COMMENT '桨距角1',
- `pitch_angle_blade_2` double DEFAULT NULL COMMENT '桨距角2',
- `pitch_angle_blade_3` double DEFAULT NULL COMMENT '桨距角3',
- `cabin_position` double DEFAULT NULL COMMENT '机舱位置',
- `true_wind_direction` double DEFAULT NULL COMMENT '绝对风向',
- `yaw_error1` double DEFAULT NULL COMMENT '对风角度',
- `set_value_of_active_power` double DEFAULT NULL COMMENT '有功功率设定值',
- `gearbox_oil_temperature` double DEFAULT NULL COMMENT '齿轮箱油温',
- `generatordrive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机驱动端轴承温度',
- `generatornon_drive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
- `cabin_temperature` double DEFAULT NULL COMMENT '机舱内温度',
- `twisted_cable_angle` double DEFAULT NULL COMMENT '扭缆角度',
- `front_back_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱前后振动',
- `side_to_side_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱左右振动',
- `actual_torque` double DEFAULT NULL COMMENT '实际力矩',
- `given_torque` double DEFAULT NULL COMMENT '给定力矩',
- `clockwise_yaw_count` double DEFAULT NULL COMMENT '顺时针偏航次数',
- `counterclockwise_yaw_count` double DEFAULT NULL COMMENT '逆时针偏航次数',
- `unusable` double DEFAULT NULL COMMENT '不可利用',
- `power_curve_available` double DEFAULT NULL COMMENT '功率曲线可用',
- `required_gearbox_speed` double DEFAULT NULL COMMENT '齿轮箱转速',
- `inverter_speed_master_control` double DEFAULT NULL COMMENT '变频器转速(主控)',
- `outside_cabin_temperature` double DEFAULT NULL COMMENT '环境温度',
- `main_bearing_temperature` double DEFAULT NULL COMMENT '主轴承轴承温度',
- `gearbox_high_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
- `gearboxmedium_speed_shaftbearing_temperature` double DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
- `gearbox_low_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
- `generator_winding1_temperature` double DEFAULT NULL COMMENT '发电机绕组1温度',
- `generator_winding2_temperature` double DEFAULT NULL COMMENT '发电机绕组2温度',
- `generator_winding3_temperature` double DEFAULT NULL COMMENT '发电机绕组3温度',
- `wind_turbine_status` double DEFAULT NULL COMMENT '风机状态1',
- `wind_turbine_status2` double DEFAULT NULL COMMENT '风机状态2',
- `turbulence_intensity` double DEFAULT NULL COMMENT '湍流强度'
- )
- """
- trans.execute(create_sql)
- def boolean_table_exists(table_name):
- table_sql = f"""
- select count(1) as count from information_schema.tables where table_name = '{table_name}'
- """
- data = trans.execute(table_sql)[0]
- if int(data['count']) == 0:
- return False
- return True
- def add_partition(table_name, pname, date_str):
- try:
- sql = f"""
- ALTER TABLE {table_name} REORGANIZE PARTITION pmax INTO (
- PARTITION {pname} VALUES LESS THAN ('{date_str}'),
- PARTITION pmax VALUES LESS THAN (MAXVALUE)
- );
- """
- trans.execute(sql)
- logger.info(f"添加{table_name}分区{pname}成功")
- except:
- logger.error(traceback.format_exc())
- def delelet_partition(table_name, pmonth):
- pname = f'p{str(pmonth.year) + str(pmonth.month).zfill(2)}'
- exists_partition_sql = f"""
- SELECT count(1) from INFORMATION_SCHEMA.`PARTITIONS` t where t.TABLE_NAME = '{table_name}' and t.PARTITION_NAME = '{pname}'
- """
- data = trans.execute(exists_partition_sql)[0]
- if data > 0:
- del_sql = f"""
- ALTER TABLE {table_name} DROP PARTITION {pname}
- """
- trans.execute(del_sql)
- logger.info(f"删除{table_name}分区{pname}成功")
- else:
- logger.info(f"删除{table_name}分区{pname}不存在")
- def get_all_partitioned_tables() -> list:
- all_tables_sql = """
- SELECT t.TABLE_NAME FROM INFORMATION_SCHEMA.`TABLES` t where t.CREATE_OPTIONS = 'partitioned'
- """
- return trans.execute(all_tables_sql)
- def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=1000):
- try:
- if 'index' in df.columns:
- del df['index']
- trans.execute_df_save(df, table_name, batch_count)
- except Exception as e:
- logger.error(traceback.format_exc())
- raise Exception(str(e))
- def load_data_local(table_name, df):
- columns_str = 'wind_turbine_number,wind_turbine_name,time_stamp,active_power,rotor_speed,generator_speed,wind_velocity,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,cabin_position,true_wind_direction,yaw_error1,set_value_of_active_power,gearbox_oil_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,cabin_temperature,twisted_cable_angle,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin,actual_torque,given_torque,clockwise_yaw_count,counterclockwise_yaw_count,unusable,power_curve_available,required_gearbox_speed,inverter_speed_master_control,outside_cabin_temperature,main_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,generator_winding1_temperature,generator_winding2_temperature,generator_winding3_temperature,wind_turbine_status,wind_turbine_status2,turbulence_intensity,lab,year,month,day,year_month'
- cols = columns_str.split(',')
- print(cols)
- df = df[cols]
- # trans.execute_df_save(df, table_name, batch_count)
- trans.safe_load_data_local(df, table_name)
- def drop_table(table_name):
- drop_sql = f"DROP TABLE `{table_name}`"
- try:
- trans.execute(drop_sql)
- except:
- logger.error(traceback.format_exc())
- def get_yesterday_tables(yesterday):
- query_sql = f"""
- select * from wind_farm_day_count where add_date = '{yesterday}' and sync_status = 0
- """
- return trans.execute(query_sql)
- def update_sync(id):
- update_sql = f"update wind_farm_day_count set sync_status = 1 where id = {id}"
- trans.execute(update_sql)
- def update_wind_farm_day_count(wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time,
- sync_status=0):
- select_sql = f"SELECT * from wind_farm_day_count WHERE `wind_farm_code` = '{wind_farm_code}' and `add_date` = '{add_date}' and `type` = '{trans_type}' "
- result = trans.execute(select_sql)
- if result:
- id = result[0]['id']
- update_sql = f"update wind_farm_day_count set count = count + {count}, latest_data_time = '{latest_data_time}' where id = {id}"
- trans.execute(update_sql)
- else:
- insert_sql = f"""
- INSERT INTO `wind_farm_day_count` (
- `wind_farm_code`,
- `wind_farm_name`,
- `add_date`,
- `type`,
- `count`,
- `sync_status`,
- `del_status`,
- `latest_data_time`
- )
- VALUES
- (
- '{wind_farm_code}',
- '{wind_farm_name}',
- '{add_date}',
- '{trans_type}',
- '{count}',
- '{sync_status}',
- '0',
- '{latest_data_time}'
- )
- """
- trans.execute(insert_sql)
- def get_sys_conf_by_key(type, param_key, default_value=None):
- sql = f"SELECT * from sys_conf t where t.type ='{type}' and t.param_key = '{param_key}' and status = 1"
- datas = trans.execute(sql)
- if isinstance(datas, tuple):
- return default_value
- else:
- return datas[0]['param_value']
- def get_sys_conf(type) -> dict:
- sql = f"SELECT * from sys_conf t where t.type ='{type}' and status = 1"
- datas = trans.execute(sql)
- if isinstance(datas, tuple):
- return {}
- else:
- result_dict = dict()
- for data in datas:
- result_dict[data['param_key']] = data['param_value']
- return result_dict
- def read_data_from_table(table_name):
- df = pd.read_sql_table(con=trans.get_engine(), table_name=table_name)
- return df
- def update_warn_fault_exist_data(changzhan, seq_no, end_time):
- updata_tables = [f'{changzhan}_warn', f'{changzhan}_fault']
- for table_name in updata_tables:
- update_sql = f"""
- update {table_name} set end_time = '{end_time}', time_diff = TIMESTAMPDIFF(SECOND, begin_time,'{end_time}')
- where seq_no = {seq_no} and end_time is null
- """
- trans.execute(update_sql)
- logger.info(f"更新{changzhan}故障顺序号{seq_no}成功")
- def update_expired_data(table_type, exists_date):
- update_expired_sql = f"""
- update wind_farm_day_count set del_status = 1 where type = '{table_type}' and add_date < '{exists_date}'
- """
- trans.execute(update_expired_sql)
- logger.info(f"删除类型{table_type},截止日期{exists_date}成功")
- def exists_windno_seq_fault(changzhan):
- types = ['warn', 'fault']
- result_dict = dict(list())
- for type in types:
- result_dict[type] = list()
- query_sql = f"select * from {changzhan}_{type} where time_diff is null"
- fault_datas = trans.execute(query_sql)
- if isinstance(fault_datas, tuple):
- result_dict[type] = []
- else:
- for data in fault_datas:
- # t.wind_turbine_name,t.seq_no,t.fault_code
- result_dict[type].append(f"{data['wind_turbine_name']}_{data['seq_no']}_{data['fault_code']}")
- return result_dict
- def update_warn_fault_update_time(date_str, max_time):
- sql = f"update wind_farm_day_count set update_time = '{max_time}' where add_date = '{date_str}' and type in ('warn','fault')"
- trans.execute(sql)
- logger.info(f"更新{date_str}的故障报警到当前时间")
- if __name__ == '__main__':
- update_wind_farm_day_count('1', '1', '2025-04010', 100)
|