trans_service.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import traceback
  5. from os import *
  6. import pandas as pd
  7. from service.trans_conf_service import create_wave_table
  8. from utils.file.trans_methods import split_array
  9. from utils.log.trans_log import trans_print
  10. from service.common_connect import trans
  11. def get_min_sec_conf(field_code, trans_type) -> dict:
  12. query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
  13. res = trans.execute(query_sql, (field_code, trans_type))
  14. if type(res) == tuple or type(res) == str:
  15. return None
  16. return res[0]
  17. def get_min_sec_conf_test(field_code, trans_type) -> dict:
  18. query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s and status = 1"
  19. res = trans.execute(query_sql, (field_code, trans_type))
  20. print(res)
  21. if type(res) == tuple or type(res) == str:
  22. return None
  23. return res[0]
  24. def get_fault_warn_conf(field_code, trans_type) -> dict:
  25. types = list()
  26. if trans_type == 'fault':
  27. types.append(1)
  28. elif trans_type == 'warn':
  29. types.append(2)
  30. else:
  31. trans_print(f"未找到{trans_type}告警/故障的配置")
  32. raise ValueError(f"未找到{trans_type}告警/故障的配置")
  33. types.append(3)
  34. query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
  35. res = trans.execute(query_sql, (field_code, types))
  36. print(res)
  37. if type(res) == tuple or type(res) == str:
  38. return None
  39. return res[0]
  40. def get_wave_conf(field_code) -> dict:
  41. query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1"
  42. res = trans.execute(query_sql, (field_code))
  43. print(res)
  44. if type(res) == tuple or type(res) == str:
  45. return None
  46. return res[0]
  47. def creat_min_sec_table(table_name, trans_type):
  48. exists_table_sql = f"""
  49. select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
  50. """
  51. count = trans.execute(exists_table_sql)[0]['count']
  52. if count > 0:
  53. trans_print(f"{table_name}已存在")
  54. if trans_type == 'second':
  55. add_key = 'KEY `year_month` (`year_month`)'
  56. key = '`year_month`'
  57. else:
  58. add_key = 'KEY `year` (`year`)'
  59. key = '`year`'
  60. if count == 0:
  61. create_sql = f"""
  62. CREATE TABLE
  63. IF NOT EXISTS `{table_name}` (
  64. `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
  65. `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
  66. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  67. `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
  68. `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
  69. `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
  70. `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
  71. `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
  72. `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
  73. `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
  74. `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
  75. `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
  76. `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
  77. `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
  78. `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
  79. `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  80. `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  81. `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
  82. `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
  83. `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
  84. `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
  85. `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
  86. `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
  87. `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
  88. `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
  89. `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
  90. `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
  91. `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
  92. `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
  93. `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
  94. `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
  95. `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  96. `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  97. `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  98. `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
  99. `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
  100. `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
  101. `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
  102. `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
  103. `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
  104. `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点 1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
  105. `year` INT (4) DEFAULT NULL COMMENT '年',
  106. `month` INT (2) DEFAULT NULL COMMENT '月',
  107. `day` INT (2) DEFAULT NULL COMMENT '日',
  108. `year_month` int(6) DEFAULT NULL COMMENT '年-月',
  109. `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
  110. `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
  111. `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
  112. `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
  113. `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
  114. `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
  115. `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
  116. `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
  117. `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
  118. `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
  119. KEY `time_stamp` (`time_stamp`),
  120. KEY `wind_turbine_number` (`wind_turbine_number`),
  121. {add_key}
  122. )
  123. PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) (
  124. PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number'))
  125. )
  126. """
  127. trans.execute(create_sql)
  128. def add_partation(table_name: str, date_str: str, wind_turbine_number):
  129. p_name = f'p{date_str}_{wind_turbine_number}'
  130. add_sql = f"""
  131. alter table {table_name} add partition (
  132. partition {p_name} VALUES IN (({date_str}, '{wind_turbine_number}'))
  133. )
  134. """
  135. trans.execute(add_sql)
  136. def remove_partation(table_name: str, date_str: str, wind_turbine_number):
  137. p_name = f'p{date_str}_{wind_turbine_number}'
  138. remove_sql = f"""
  139. alter table {table_name} DROP PARTITION {p_name}
  140. """
  141. trans.execute(remove_sql)
  142. def add_or_remove_partation(table_name: str, date_str: str, wind_turbine_number):
  143. p_name = f'p{date_str}_{wind_turbine_number}'
  144. query_partation = f"""
  145. SELECT count(1) as count from information_schema.`PARTITIONS` t
  146. where t.TABLE_SCHEMA = '{trans.database}'
  147. and t.TABLE_NAME = '{table_name}'
  148. and t.PARTITION_NAME = '{p_name}'
  149. """
  150. count = trans.execute(query_partation)[0]['count']
  151. if count == 0:
  152. add_partation(table_name, date_str, wind_turbine_number)
  153. else:
  154. remove_partation(table_name, date_str, wind_turbine_number)
  155. add_partation(table_name, date_str, wind_turbine_number)
  156. def save_partation_file_to_db(table_name: str, file: str, wind_turbine_number, date_str, batch_count=100000):
  157. base_name = path.basename(file)
  158. # wind_turbine_number = path.basename(file).split(".")[0]
  159. # date_str = path.basename(path.dirname(file))
  160. add_or_remove_partation(table_name, date_str, wind_turbine_number)
  161. try:
  162. for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
  163. trans.execute_df_save(df, table_name)
  164. count = (i + 1) * batch_count
  165. trans_print(base_name, f"Chunk {count} written to MySQL.")
  166. except Exception as e:
  167. trans_print(traceback.format_exc())
  168. message = base_name + str(e)
  169. raise Exception(message)
  170. def save_file_to_db(table_name: str, file: str, batch_count=100000):
  171. base_name = path.basename(file)
  172. try:
  173. for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
  174. # df.to_sql(table_name, engine, if_exists='append', index=False)
  175. trans.execute_df_save(df, table_name)
  176. count = (i + 1) * batch_count
  177. trans_print(base_name, f"Chunk {count} written to MySQL.")
  178. except Exception as e:
  179. trans_print(traceback.format_exc())
  180. message = base_name + str(e)
  181. raise Exception(message)
  182. def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000):
  183. split_dfs = [df.iloc[i:i + batch_count] for i in range(0, len(df), batch_count)]
  184. try:
  185. for i, split_df in enumerate(split_dfs):
  186. trans.execute_df_save(split_df, table_name)
  187. count = (i + 1) * batch_count
  188. trans_print(f"Chunk {count} written to MySQL.")
  189. except Exception as e:
  190. trans_print(traceback.format_exc())
  191. raise Exception(str(e))
  192. def batch_statistics(table_name):
  193. query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t "
  194. try:
  195. res = trans.execute(query_sql)
  196. return res[0]
  197. except:
  198. trans_print(traceback.format_exc())
  199. return None
  200. def create_warn_fault_table(table_name):
  201. sql = f"""
  202. CREATE TABLE `{table_name}` (
  203. `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
  204. `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
  205. `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
  206. `end_time` datetime DEFAULT NULL COMMENT '结束时间',
  207. `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
  208. `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
  209. `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
  210. `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
  211. `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
  212. `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
  213. `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
  214. KEY `wind_turbine_number` (`wind_turbine_number`),
  215. KEY `begin_time` (`begin_time`),
  216. KEY `end_time` (`end_time`)
  217. ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
  218. """
  219. trans.execute(sql)
  220. def drop_table(table_name):
  221. drop_sql = f"DROP TABLE `{table_name}`"
  222. try:
  223. trans.execute(drop_sql)
  224. except:
  225. pass
  226. def get_or_create_wave_table(table_name):
  227. create_table = False
  228. query_sql = f"select 1 from `{table_name}` limit 1"
  229. try:
  230. trans.execute(query_sql)
  231. except:
  232. create_table = True
  233. if create_table:
  234. create_wave_table(table_name)
  235. def get_wave_data(table_name, min_data, max_data):
  236. query_sql = f"""
  237. select id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}'
  238. """
  239. return trans.read_sql_to_df(query_sql)
  240. def delete_exist_wave_data(table_name, ids):
  241. all_arrays = split_array(ids, 1000)
  242. for array in all_arrays:
  243. ids_str = ",".join(['%s'] * len(array))
  244. delete_sql = f"delete from `{table_name}` where id in ({ids_str})"
  245. trans.execute(delete_sql, array)
  246. def get_trans_exec_code(id, query_type):
  247. query_sql = f"SELECT * from batch_exec_code t where t.id = '{id}' and type='{query_type}' and t.`status` = 1 limit 1"
  248. res = trans.execute(query_sql)
  249. if type(res) == tuple or type(res) == str:
  250. return None
  251. exec_code = res[0]['exec_code']
  252. trans_print("任务ID", id, '类型', type, '获取到执行代码:', exec_code)
  253. return exec_code
  254. if __name__ == '__main__':
  255. delete_exist_wave_data('SKF001_wave', [1, 2, 3])