trans_service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import traceback
  5. import pandas as pd
  6. from service.common_connect import trans
  7. from utils.log.trans_log import logger
  8. def create_tmp_table(table_name):
  9. create_sql = f"""
  10. CREATE TABLE `{table_name}` (
  11. `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
  12. `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '风机原始名称',
  13. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  14. `active_power` double DEFAULT NULL COMMENT '有功功率',
  15. `rotor_speed` double DEFAULT NULL COMMENT '风轮转速',
  16. `generator_speed` double DEFAULT NULL COMMENT '发电机转速',
  17. `wind_velocity` double DEFAULT NULL COMMENT '风速',
  18. `pitch_angle_blade_1` double DEFAULT NULL COMMENT '桨距角1',
  19. `pitch_angle_blade_2` double DEFAULT NULL COMMENT '桨距角2',
  20. `pitch_angle_blade_3` double DEFAULT NULL COMMENT '桨距角3',
  21. `cabin_position` double DEFAULT NULL COMMENT '机舱位置',
  22. `true_wind_direction` double DEFAULT NULL COMMENT '绝对风向',
  23. `yaw_error1` double DEFAULT NULL COMMENT '对风角度',
  24. `set_value_of_active_power` double DEFAULT NULL COMMENT '有功功率设定值',
  25. `gearbox_oil_temperature` double DEFAULT NULL COMMENT '齿轮箱油温',
  26. `generatordrive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  27. `generatornon_drive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  28. `cabin_temperature` double DEFAULT NULL COMMENT '机舱内温度',
  29. `twisted_cable_angle` double DEFAULT NULL COMMENT '扭缆角度',
  30. `front_back_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱前后振动',
  31. `side_to_side_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱左右振动',
  32. `actual_torque` double DEFAULT NULL COMMENT '实际力矩',
  33. `given_torque` double DEFAULT NULL COMMENT '给定力矩',
  34. `clockwise_yaw_count` double DEFAULT NULL COMMENT '顺时针偏航次数',
  35. `counterclockwise_yaw_count` double DEFAULT NULL COMMENT '逆时针偏航次数',
  36. `unusable` double DEFAULT NULL COMMENT '不可利用',
  37. `power_curve_available` double DEFAULT NULL COMMENT '功率曲线可用',
  38. `required_gearbox_speed` double DEFAULT NULL COMMENT '齿轮箱转速',
  39. `inverter_speed_master_control` double DEFAULT NULL COMMENT '变频器转速(主控)',
  40. `outside_cabin_temperature` double DEFAULT NULL COMMENT '环境温度',
  41. `main_bearing_temperature` double DEFAULT NULL COMMENT '主轴承轴承温度',
  42. `gearbox_high_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  43. `gearboxmedium_speed_shaftbearing_temperature` double DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  44. `gearbox_low_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  45. `generator_winding1_temperature` double DEFAULT NULL COMMENT '发电机绕组1温度',
  46. `generator_winding2_temperature` double DEFAULT NULL COMMENT '发电机绕组2温度',
  47. `generator_winding3_temperature` double DEFAULT NULL COMMENT '发电机绕组3温度',
  48. `wind_turbine_status` double DEFAULT NULL COMMENT '风机状态1',
  49. `wind_turbine_status2` double DEFAULT NULL COMMENT '风机状态2',
  50. `turbulence_intensity` double DEFAULT NULL COMMENT '湍流强度'
  51. )
  52. """
  53. trans.execute(create_sql)
  54. def boolean_table_exists(table_name):
  55. table_sql = f"""
  56. select count(1) as count from information_schema.tables where table_name = '{table_name}'
  57. """
  58. data = trans.execute(table_sql)[0]
  59. if int(data['count']) == 0:
  60. return False
  61. return True
  62. def add_partition(table_name, pname, date_str):
  63. try:
  64. sql = f"""
  65. ALTER TABLE {table_name} REORGANIZE PARTITION pmax INTO (
  66. PARTITION {pname} VALUES LESS THAN ('{date_str}'),
  67. PARTITION pmax VALUES LESS THAN (MAXVALUE)
  68. );
  69. """
  70. trans.execute(sql)
  71. logger.info(f"添加{table_name}分区{pname}成功")
  72. except:
  73. logger.error(traceback.format_exc())
  74. def delelet_partition(table_name, pmonth):
  75. pname = f'p{str(pmonth.year) + str(pmonth.month).zfill(2)}'
  76. exists_partition_sql = f"""
  77. SELECT count(1) from INFORMATION_SCHEMA.`PARTITIONS` t where t.TABLE_NAME = '{table_name}' and t.PARTITION_NAME = '{pname}'
  78. """
  79. data = trans.execute(exists_partition_sql)[0]
  80. if data > 0:
  81. del_sql = f"""
  82. ALTER TABLE {table_name} DROP PARTITION {pname}
  83. """
  84. trans.execute(del_sql)
  85. logger.info(f"删除{table_name}分区{pname}成功")
  86. else:
  87. logger.info(f"删除{table_name}分区{pname}不存在")
  88. def get_all_partitioned_tables() -> list:
  89. all_tables_sql = """
  90. SELECT t.TABLE_NAME FROM INFORMATION_SCHEMA.`TABLES` t where t.CREATE_OPTIONS = 'partitioned'
  91. """
  92. return trans.execute(all_tables_sql)
  93. def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=1000):
  94. try:
  95. if 'index' in df.columns:
  96. del df['index']
  97. trans.execute_df_save(df, table_name, batch_count)
  98. except Exception as e:
  99. logger.error(traceback.format_exc())
  100. raise Exception(str(e))
  101. def load_data_local(table_name, df):
  102. columns_str = 'wind_turbine_number,wind_turbine_name,time_stamp,active_power,rotor_speed,generator_speed,wind_velocity,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,cabin_position,true_wind_direction,yaw_error1,set_value_of_active_power,gearbox_oil_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,cabin_temperature,twisted_cable_angle,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin,actual_torque,given_torque,clockwise_yaw_count,counterclockwise_yaw_count,unusable,power_curve_available,required_gearbox_speed,inverter_speed_master_control,outside_cabin_temperature,main_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,generator_winding1_temperature,generator_winding2_temperature,generator_winding3_temperature,wind_turbine_status,wind_turbine_status2,turbulence_intensity,lab,year,month,day,year_month'
  103. cols = columns_str.split(',')
  104. print(cols)
  105. df = df[cols]
  106. # trans.execute_df_save(df, table_name, batch_count)
  107. trans.safe_load_data_local(df, table_name)
  108. def drop_table(table_name):
  109. drop_sql = f"DROP TABLE `{table_name}`"
  110. try:
  111. trans.execute(drop_sql)
  112. except:
  113. logger.error(traceback.format_exc())
  114. def get_yesterday_tables(yesterday):
  115. query_sql = f"""
  116. select * from wind_farm_day_count where add_date = '{yesterday}' and sync_status = 0
  117. """
  118. return trans.execute(query_sql)
  119. def update_sync(id):
  120. update_sql = f"update wind_farm_day_count set sync_status = 1 where id = {id}"
  121. trans.execute(update_sql)
  122. def update_wind_farm_day_count(wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time,
  123. sync_status=0):
  124. select_sql = f"SELECT * from wind_farm_day_count WHERE `wind_farm_code` = '{wind_farm_code}' and `add_date` = '{add_date}' and `type` = '{trans_type}' "
  125. result = trans.execute(select_sql)
  126. if result:
  127. id = result[0]['id']
  128. update_sql = f"update wind_farm_day_count set count = count + {count}, latest_data_time = '{latest_data_time}' where id = {id}"
  129. trans.execute(update_sql)
  130. else:
  131. insert_sql = f"""
  132. INSERT INTO `wind_farm_day_count` (
  133. `wind_farm_code`,
  134. `wind_farm_name`,
  135. `add_date`,
  136. `type`,
  137. `count`,
  138. `sync_status`,
  139. `del_status`,
  140. `latest_data_time`
  141. )
  142. VALUES
  143. (
  144. '{wind_farm_code}',
  145. '{wind_farm_name}',
  146. '{add_date}',
  147. '{trans_type}',
  148. '{count}',
  149. '{sync_status}',
  150. '0',
  151. '{latest_data_time}'
  152. )
  153. """
  154. trans.execute(insert_sql)
  155. def get_sys_conf_by_key(type, param_key, default_value=None):
  156. sql = f"SELECT * from sys_conf t where t.type ='{type}' and t.param_key = '{param_key}' and status = 1"
  157. datas = trans.execute(sql)
  158. if isinstance(datas, tuple):
  159. return default_value
  160. else:
  161. return datas[0]['param_value']
  162. def get_sys_conf(type) -> dict:
  163. sql = f"SELECT * from sys_conf t where t.type ='{type}' and status = 1"
  164. datas = trans.execute(sql)
  165. if isinstance(datas, tuple):
  166. return {}
  167. else:
  168. result_dict = dict()
  169. for data in datas:
  170. result_dict[data['param_key']] = data['param_value']
  171. return result_dict
  172. def read_data_from_table(table_name):
  173. df = pd.read_sql_table(con=trans.get_engine(), table_name=table_name)
  174. return df
  175. def update_warn_fault_exist_data(changzhan, seq_no, end_time):
  176. updata_tables = [f'{changzhan}_warn', f'{changzhan}_fault']
  177. for table_name in updata_tables:
  178. update_sql = f"""
  179. update {table_name} set end_time = '{end_time}', time_diff = TIMESTAMPDIFF(SECOND, begin_time,'{end_time}')
  180. where seq_no = {seq_no} and end_time is null
  181. """
  182. trans.execute(update_sql)
  183. logger.info(f"更新{changzhan}故障顺序号{seq_no}成功")
  184. def update_expired_data(table_type, exists_date):
  185. update_expired_sql = f"""
  186. update wind_farm_day_count set del_status = 1 where type = '{table_type}' and add_date < '{exists_date}'
  187. """
  188. trans.execute(update_expired_sql)
  189. logger.info(f"删除类型{table_type},截止日期{exists_date}成功")
  190. def exists_windno_seq_fault(changzhan):
  191. types = ['warn', 'fault']
  192. result_dict = dict(list())
  193. for type in types:
  194. result_dict[type] = list()
  195. query_sql = f"select * from {changzhan}_{type} where time_diff is null"
  196. fault_datas = trans.execute(query_sql)
  197. if isinstance(fault_datas, tuple):
  198. result_dict[type] = []
  199. else:
  200. for data in fault_datas:
  201. # t.wind_turbine_name,t.seq_no,t.fault_code
  202. result_dict[type].append(f"{data['wind_turbine_name']}_{data['seq_no']}_{data['fault_code']}")
  203. return result_dict
  204. def update_warn_fault_update_time(date_str, max_time):
  205. sql = f"update wind_farm_day_count set update_time = '{max_time}' where add_date = '{date_str}' and type in ('warn','fault')"
  206. trans.execute(sql)
  207. logger.info(f"更新{date_str}的故障报警到当前时间")
  208. if __name__ == '__main__':
  209. update_wind_farm_day_count('1', '1', '2025-04010', 100)