trans_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import os
  5. import traceback
  6. import pandas as pd
  7. from utils.db.ConnectMysql import ConnectMysql
  8. from utils.file.trans_methods import split_array
  9. from utils.log.trans_log import trans_print
  10. trans = ConnectMysql("trans")
  11. def get_min_sec_conf(field_code, trans_type) -> dict:
  12. query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
  13. res = trans.execute(query_sql, (field_code, trans_type))
  14. print(res)
  15. if type(res) == tuple:
  16. return None
  17. return res[0]
  18. def get_min_sec_conf_test(field_code, trans_type) -> dict:
  19. query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s and status = 1"
  20. res = trans.execute(query_sql, (field_code, trans_type))
  21. print(res)
  22. if type(res) == tuple:
  23. return None
  24. return res[0]
  25. def get_fault_warn_conf(field_code, trans_type) -> dict:
  26. types = list()
  27. if trans_type == 'fault':
  28. types.append(1)
  29. elif trans_type == 'warn':
  30. types.append(2)
  31. else:
  32. trans_print(f"未找到{trans_type}告警/故障的配置")
  33. raise ValueError(f"未找到{trans_type}告警/故障的配置")
  34. types.append(3)
  35. query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
  36. res = trans.execute(query_sql, (field_code, types))
  37. print(res)
  38. if type(res) == tuple:
  39. return None
  40. return res[0]
  41. def get_wave_conf(field_code) -> dict:
  42. query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1"
  43. res = trans.execute(query_sql, (field_code))
  44. print(res)
  45. if type(res) == tuple:
  46. return None
  47. return res[0]
  48. def creat_min_sec_table(table_name, win_names, read_type):
  49. create_sql = f"""
  50. CREATE TABLE
  51. IF NOT EXISTS `{table_name}` (
  52. `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
  53. `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
  54. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  55. `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
  56. `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
  57. `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
  58. `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
  59. `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
  60. `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
  61. `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
  62. `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
  63. `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
  64. `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
  65. `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
  66. `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
  67. `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  68. `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  69. `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
  70. `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
  71. `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
  72. `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
  73. `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
  74. `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
  75. `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
  76. `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
  77. `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
  78. `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
  79. `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
  80. `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
  81. `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
  82. `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
  83. `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  84. `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  85. `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  86. `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
  87. `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
  88. `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
  89. `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
  90. `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
  91. `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
  92. `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点 1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
  93. `year` INT (4) DEFAULT NULL COMMENT '年',
  94. `month` INT (2) DEFAULT NULL COMMENT '月',
  95. `day` INT (2) DEFAULT NULL COMMENT '日',
  96. `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
  97. `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
  98. `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
  99. `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
  100. `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
  101. `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
  102. `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
  103. `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
  104. `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
  105. `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
  106. KEY `time_stamp` (`time_stamp`),
  107. KEY `wind_turbine_number` (`wind_turbine_number`)
  108. ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
  109. """
  110. if read_type == 'second' and win_names and len(win_names) > 1:
  111. create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
  112. partition_strs = list()
  113. for wind_name in win_names:
  114. partition_strs.append(f" PARTITION p{wind_name} VALUES IN('{wind_name}')")
  115. create_sql = create_sql + ",".join(partition_strs) + ")"
  116. trans.execute(create_sql)
  117. def rename_table(table_name, renamed_table_name, save_db=True):
  118. if save_db:
  119. rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
  120. try:
  121. trans.execute(rename_sql)
  122. except:
  123. trans_print(traceback.format_exc())
  124. def drop_table(table_name, save_db=True):
  125. if save_db:
  126. rename_sql = f"drop TABLE `{table_name}`"
  127. try:
  128. trans.execute(rename_sql)
  129. except:
  130. trans_print(traceback.format_exc())
  131. def clear_table(table_name, save_db=True):
  132. if save_db:
  133. rename_sql = f"truncate TABLE `{table_name}`"
  134. try:
  135. trans.execute(rename_sql)
  136. except:
  137. trans_print(traceback.format_exc())
  138. def save_file_to_db(table_name: str, file: str, batch_count=100000):
  139. base_name = os.path.basename(file)
  140. try:
  141. for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
  142. # df.to_sql(table_name, engine, if_exists='append', index=False)
  143. trans.execute_df_save(df, table_name)
  144. count = (i + 1) * batch_count
  145. trans_print(base_name, f"Chunk {count} written to MySQL.")
  146. except Exception as e:
  147. trans_print(traceback.format_exc())
  148. message = base_name + str(e)
  149. raise Exception(message)
  150. def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000):
  151. split_dfs = [df.iloc[i:i + batch_count] for i in range(0, len(df), batch_count)]
  152. try:
  153. for i, split_df in enumerate(split_dfs):
  154. trans.execute_df_save(split_df, table_name)
  155. count = (i + 1) * batch_count
  156. trans_print(f"Chunk {count} written to MySQL.")
  157. except Exception as e:
  158. trans_print(traceback.format_exc())
  159. raise Exception(str(e))
  160. def batch_statistics(table_name):
  161. query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t "
  162. try:
  163. res = trans.execute(query_sql)
  164. return res[0]
  165. except:
  166. trans_print(traceback.format_exc())
  167. return None
  168. def create_warn_fault_table(table_name):
  169. sql = f"""
  170. CREATE TABLE `{table_name}` (
  171. `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
  172. `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
  173. `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
  174. `end_time` datetime DEFAULT NULL COMMENT '结束时间',
  175. `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
  176. `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
  177. `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
  178. `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
  179. `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
  180. `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
  181. `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
  182. KEY `wind_turbine_number` (`wind_turbine_number`),
  183. KEY `begin_time` (`begin_time`),
  184. KEY `end_time` (`end_time`)
  185. ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
  186. """
  187. trans.execute(sql)
  188. def get_or_create_wave_table(table_name):
  189. create_table = False
  190. query_sql = f"select 1 from `{table_name}` limit 1"
  191. try:
  192. trans.execute(query_sql)
  193. except:
  194. create_table = True
  195. if create_table:
  196. sql = f"""
  197. CREATE TABLE `{table_name}` (
  198. `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  199. `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
  200. `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
  201. `time_stamp` datetime DEFAULT NULL COMMENT '时间',
  202. `sampling_frequency` varchar(50) DEFAULT NULL COMMENT '分析频率',
  203. `mesure_point_name` varchar(100) DEFAULT NULL COMMENT '测点名称',
  204. `mesure_data` mediumtext COMMENT '测点数据',
  205. PRIMARY KEY (`id`),
  206. KEY `wind_turbine_number` (`wind_turbine_number`),
  207. KEY `time_stamp` (`time_stamp`),
  208. KEY `mesure_point_name` (`mesure_point_name`)
  209. ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
  210. """
  211. trans.execute(sql)
  212. def get_wave_data(table_name, min_data, max_data):
  213. query_sql = f"""
  214. select id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}'
  215. """
  216. return trans.read_sql_to_df(query_sql)
  217. def delete_exist_wave_data(table_name, ids):
  218. all_arrays = split_array(ids, 1000)
  219. for array in all_arrays:
  220. ids_str = ",".join(['%s'] * len(array))
  221. delete_sql = f"delete from `{table_name}` where id in ({ids_str})"
  222. trans.execute(delete_sql, array)
  223. if __name__ == '__main__':
  224. # path_prix = r"/data/download/collection_data/2完成/招远风电场-山东-大唐/清理数据/WOF063100040-WOB00013/second"
  225. # files = ["WOG00030.csv", "WOG00034.csv"]
  226. # for path in files:
  227. # save_file_to_db("WOF063100040-WOB00013_second", path_prix + os.sep + path, batch_count=100000)
  228. # sql = """
  229. # SELECT wind_turbine_number, time_stamp, wind_velocity, active_power
  230. # FROM `WOF085500002-WOB000001_second`
  231. # WHERE time_stamp >= '2024-02-17 00:00:00' AND time_stamp <= '2024-05-14 00:00:00' AND lab = 0
  232. # """
  233. #
  234. # begin = datetime.datetime.now()
  235. # df = trans.read_sql_to_df(sql)
  236. # end = datetime.datetime.now()
  237. # print(df.shape)
  238. # print(df.info())
  239. # print("Time used:", (end - begin).seconds)
  240. # get_fault_warn_conf("test", "fault")
  241. delete_exist_wave_data('SKF001_wave', [1, 2, 3])