trans_service.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import traceback
  5. from os import *
  6. import pandas as pd
  7. from service.common_connect import trans
  8. from service.trans_conf_service import create_wave_table
  9. from utils.file.trans_methods import split_array
  10. from utils.log.trans_log import info, error
  11. def get_config(table_name, field_code, trans_type=None, field_name='wind_code', status=1) -> dict:
  12. """
  13. 通用配置获取函数
  14. Args:
  15. table_name: 表名
  16. field_code: 字段值
  17. trans_type: 类型参数
  18. field_name: 字段名,默认为wind_code
  19. status: 状态值,默认为1
  20. Returns:
  21. 配置字典
  22. """
  23. if table_name == 'warn_fault_conf':
  24. types = list()
  25. if trans_type == 'fault':
  26. types.append(1)
  27. elif trans_type == 'warn':
  28. types.append(2)
  29. else:
  30. error(f"未找到{trans_type}告警/故障的配置")
  31. raise ValueError(f"未找到{trans_type}告警/故障的配置")
  32. types.append(3)
  33. query_sql = f"SELECT * FROM {table_name} where {field_name} = %s and type in %s and status = %s"
  34. params = (field_code, types, status)
  35. elif table_name == 'trans_conf' and field_name == 'wind_name':
  36. query_sql = f"SELECT * FROM {table_name} where {field_name} = %s and type = %s and status = %s"
  37. params = (field_code, trans_type, status)
  38. elif table_name == 'trans_conf':
  39. query_sql = f"SELECT * FROM {table_name} where {field_name} = %s and type = %s and status = %s"
  40. params = (field_code, trans_type, status)
  41. else:
  42. query_sql = f"SELECT * FROM {table_name} where {field_name} = %s and status = %s"
  43. params = (field_code, status)
  44. res = trans.execute(query_sql, params)
  45. if type(res) == tuple or type(res) == str:
  46. return None
  47. return res[0]
  48. def get_min_sec_conf(field_code, trans_type) -> dict:
  49. return get_config('trans_conf', field_code, trans_type)
  50. def get_min_sec_conf_test(field_code, trans_type) -> dict:
  51. return get_config('trans_conf', field_code, trans_type, field_name='wind_name')
  52. def get_fault_warn_conf(field_code, trans_type) -> dict:
  53. return get_config('warn_fault_conf', field_code, trans_type)
  54. def get_wave_conf(field_code) -> dict:
  55. return get_config('wave_conf', field_code)
  56. def creat_min_sec_table(table_name, trans_type, wind_farm_name='', use_tidb=False):
  57. exists_table_sql = f"""
  58. select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
  59. """
  60. count = trans.execute(exists_table_sql)[0]['count']
  61. if count > 0:
  62. info(f"{table_name}已存在")
  63. if trans_type == 'second':
  64. add_key = 'KEY `year_month` (`year_month`)'
  65. key = '`year_month`'
  66. else:
  67. add_key = 'KEY `year` (`year`)'
  68. key = '`year`'
  69. if count == 0:
  70. create_sql = f"""
  71. CREATE TABLE
  72. IF NOT EXISTS `{table_name}` (
  73. `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
  74. `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
  75. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  76. `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
  77. `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
  78. `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
  79. `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
  80. `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
  81. `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
  82. `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
  83. `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
  84. `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
  85. `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
  86. `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
  87. `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
  88. `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  89. `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  90. `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
  91. `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
  92. `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
  93. `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
  94. `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
  95. `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
  96. `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
  97. `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
  98. `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
  99. `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
  100. `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
  101. `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
  102. `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
  103. `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
  104. `main_bearing_temperature_2` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度2',
  105. `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  106. `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  107. `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  108. `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
  109. `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
  110. `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
  111. `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
  112. `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
  113. `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
  114. `grid_a_phase_current` DOUBLE DEFAULT NULL COMMENT '电网A相电流',
  115. `grid_b_phase_current` DOUBLE DEFAULT NULL COMMENT '电网B相电流',
  116. `grid_c_phase_current` DOUBLE DEFAULT NULL COMMENT '电网C相电流',
  117. `reactive_power` DOUBLE DEFAULT NULL COMMENT '无功功率',
  118. `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点 1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
  119. `year` INT (4) DEFAULT NULL COMMENT '年',
  120. `month` INT (2) DEFAULT NULL COMMENT '月',
  121. `day` INT (2) DEFAULT NULL COMMENT '日',
  122. `year_month` int(6) DEFAULT NULL COMMENT '年-月',
  123. `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
  124. `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
  125. `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
  126. `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
  127. `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
  128. `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
  129. `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
  130. `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
  131. `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
  132. `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
  133. KEY `time_stamp` (`time_stamp`),
  134. KEY `wind_turbine_number` (`wind_turbine_number`),
  135. {add_key}
  136. ) COMMENT='{wind_farm_name}'
  137. """
  138. # if not use_tidb:
  139. create_sql = create_sql + f"""
  140. PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) (
  141. PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number'))
  142. )
  143. """
  144. trans.execute(create_sql)
  145. def add_partation(table_name: str, date_str: str, wind_turbine_number):
  146. p_name = f'p{date_str}_{wind_turbine_number}'
  147. add_sql = f"""
  148. alter table {table_name} add partition (
  149. partition {p_name} VALUES IN (({date_str}, '{wind_turbine_number}'))
  150. )
  151. """
  152. trans.execute(add_sql)
  153. def remove_partation(table_name: str, date_str: str, wind_turbine_number):
  154. p_name = f'p{date_str}_{wind_turbine_number}'
  155. remove_sql = f"""
  156. alter table {table_name} DROP PARTITION {p_name}
  157. """
  158. trans.execute(remove_sql)
  159. def add_or_remove_partation(table_name: str, date_str: str, wind_turbine_number):
  160. p_name = f'p{date_str}_{wind_turbine_number}'
  161. query_partation = f"""
  162. SELECT count(1) as count from information_schema.`PARTITIONS` t
  163. where t.TABLE_SCHEMA = '{trans.database}'
  164. and t.TABLE_NAME = '{table_name}'
  165. and t.PARTITION_NAME = '{p_name}'
  166. """
  167. count = trans.execute(query_partation)[0]['count']
  168. if count == 0:
  169. add_partation(table_name, date_str, wind_turbine_number)
  170. else:
  171. remove_partation(table_name, date_str, wind_turbine_number)
  172. add_partation(table_name, date_str, wind_turbine_number)
  173. def drop_exists_data(table_name, wind_turbine_number, min_date, max_date):
  174. # sql = f"# delete from {table_name} where wind_turbine_number = '{wind_turbine_number}' and time_stamp between '{min_date}' and '{max_date}'"
  175. sql = f"""
  176. BATCH ON `time_stamp`, `wind_turbine_number` LIMIT 1000
  177. DELETE FROM `{table_name}`
  178. WHERE `rated_at` >= "{min_date}"
  179. AND `rated_at` <= "{max_date}"
  180. AND `wind_turbine_number` = "{wind_turbine_number}";
  181. """
  182. count = trans.execute(sql)
  183. info(f"删除数据{count}条,{table_name},{wind_turbine_number},{min_date},{max_date}")
  184. def save_data_to_db(table_name: str, data, batch_count=100000, wind_turbine_number=None, date_str=None, file_name=None):
  185. """
  186. 通用数据保存函数
  187. Args:
  188. table_name: 表名
  189. data: 数据,可以是DataFrame或文件路径
  190. batch_count: 批处理大小
  191. wind_turbine_number: 风机编号
  192. date_str: 日期字符串
  193. file_name: 文件名
  194. Returns:
  195. None
  196. """
  197. try:
  198. # 处理数据
  199. if isinstance(data, str):
  200. # 从文件读取数据
  201. df = pd.read_csv(data)
  202. file_name = file_name or path.basename(data)
  203. else:
  204. # 直接使用DataFrame
  205. df = data
  206. # 处理分区
  207. if wind_turbine_number and date_str:
  208. add_or_remove_partation(table_name, date_str, wind_turbine_number)
  209. # 保存数据
  210. if wind_turbine_number:
  211. trans.execute_df_save(df, table_name, batch_count)
  212. info(f"保存到{table_name},{file_name},{wind_turbine_number} 成功,总条数:{df.shape[0]}")
  213. else:
  214. trans.execute_df_save(df, table_name, batch_count)
  215. info(f"保存到{table_name}成功,总条数:{df.shape[0]}")
  216. except Exception as e:
  217. if file_name:
  218. message = file_name + str(e)
  219. else:
  220. message = str(e)
  221. raise Exception(message)
  222. def save_scada_file_to_db(table_name, file: str, wind_turbine_number, date_str, batch_count=100000, use_tidb=False):
  223. save_data_to_db(table_name, file, batch_count, wind_turbine_number, date_str)
  224. def save_file_to_db(table_name: str, file: str, batch_count=100000):
  225. save_data_to_db(table_name, file, batch_count)
  226. def save_df_to_db(table_name: str, df: pd.DataFrame, batch_count=100000):
  227. save_data_to_db(table_name, df, batch_count)
  228. def batch_statistics(table_name):
  229. query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t "
  230. try:
  231. res = trans.execute(query_sql)
  232. return res[0]
  233. except:
  234. error(traceback.format_exc())
  235. return None
  236. def create_warn_fault_table(table_name, wind_farm_name=''):
  237. sql = f"""
  238. CREATE TABLE `{table_name}` (
  239. `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
  240. `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
  241. `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
  242. `end_time` datetime DEFAULT NULL COMMENT '结束时间',
  243. `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
  244. `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
  245. `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
  246. `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
  247. `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
  248. `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
  249. `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
  250. KEY `wind_turbine_number` (`wind_turbine_number`),
  251. KEY `begin_time` (`begin_time`),
  252. KEY `end_time` (`end_time`)
  253. ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COMMENT='{wind_farm_name}'
  254. """
  255. trans.execute(sql)
  256. def drop_table(table_name):
  257. drop_sql = f"DROP TABLE `{table_name}`"
  258. try:
  259. trans.execute(drop_sql)
  260. except:
  261. pass
  262. def get_or_create_wave_table(table_name):
  263. create_table = False
  264. query_sql = f"select 1 from `{table_name}` limit 1"
  265. try:
  266. trans.execute(query_sql)
  267. except:
  268. create_table = True
  269. if create_table:
  270. create_wave_table(table_name)
  271. def get_wave_data(table_name, min_data, max_data):
  272. query_sql = f"""
  273. select id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}'
  274. """
  275. return trans.read_sql_to_df(query_sql)
  276. def delete_exist_wave_data(table_name, ids):
  277. all_arrays = split_array(ids, 1000)
  278. for array in all_arrays:
  279. ids_str = ",".join(['%s'] * len(array))
  280. delete_sql = f"delete from `{table_name}` where id in ({ids_str})"
  281. trans.execute(delete_sql, array)
  282. def get_trans_exec_code(id, query_type):
  283. query_sql = f"SELECT * from batch_exec_code t where t.id = '{id}' and type='{query_type}' and t.`status` = 1 limit 1"
  284. res = trans.execute(query_sql)
  285. if type(res) == tuple or type(res) == str:
  286. return None
  287. exec_code = res[0]['exec_code']
  288. info("任务ID", id, '类型', type, '获取到执行代码:', exec_code)
  289. return exec_code
  290. if __name__ == '__main__':
  291. delete_exist_wave_data('SKF001_wave', [1, 2, 3])