trans_mysql.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import numpy as np
  6. import pandas as pd
  7. # 建立数据库连接
  8. import pymysql
  9. from pandas import DataFrame
  10. from sqlalchemy import create_engine, Engine, text
  11. from utils.log.trans_log import trans_print
  12. from utils.trans_methods import read_file_to_df
  13. plt_user = 'admin'
  14. plt_password = 'admin123456'
  15. plt_host = '192.168.50.233'
  16. plt_port = 3306
  17. plt_database = 'energy'
  18. plt_connect = pymysql.connect(host=plt_host,
  19. user=plt_user,
  20. password=plt_password,
  21. db=plt_database,
  22. charset='utf8mb4',
  23. cursorclass=pymysql.cursors.DictCursor)
  24. trans_user = 'admin'
  25. trans_password = 'admin123456'
  26. trans_host = '192.168.50.233'
  27. trans_port = 3306
  28. trans_database = 'energy_data'
  29. trans_connect = pymysql.connect(host=trans_host,
  30. user=trans_user,
  31. password=trans_password,
  32. db=trans_database,
  33. charset='utf8mb4',
  34. cursorclass=pymysql.cursors.DictCursor)
  35. susscess_sql = "update data_transfer set transfer_state = 1 where batch_code = '{batch_no}' and transfer_type = '{" \
  36. "trans_type}'";
  37. error_sql = "update data_transfer set transfer_state = 2 ,err_info='{message}' where batch_code = '{batch_no}' and " \
  38. "transfer_type = '{trans_type}' "
  39. running_sql = "update data_transfer set transfer_state = 0,engine_count = {wind_count} where batch_code = '{" \
  40. "batch_no}' and transfer_type = '{trans_type}' "
  41. def __query(connect: pymysql.Connection, sql):
  42. trans_print('开始执行SQL:', sql)
  43. connect.ping(reconnect=True)
  44. with connect.cursor() as cursor:
  45. cursor.execute(sql)
  46. datas = cursor.fetchall()
  47. df = pd.DataFrame(datas)
  48. connect.close()
  49. return df
  50. def __ddl_sql(connect: pymysql.Connection, sql):
  51. trans_print('开始执行SQL:', sql)
  52. connect.ping(reconnect=True)
  53. with connect.cursor() as cursor:
  54. cursor.execute(sql)
  55. connect.commit()
  56. connect.close()
  57. def update_trans_status(batch_no, trans_type, status, message="", wind_count=0):
  58. if status == 'success':
  59. exec_sql = susscess_sql
  60. elif status == 'running':
  61. exec_sql = running_sql
  62. else:
  63. exec_sql = error_sql
  64. exec_sql = exec_sql.format(batch_no=batch_no, status=status, message=message, trans_type=trans_type,
  65. wind_count=wind_count)
  66. __ddl_sql(plt_connect, exec_sql)
  67. # 获取执行的数据
  68. def get_exec_data():
  69. query_running_sql = "select 1 from data_transfer where transfer_state = 0"
  70. query_next_exdc_sql = """
  71. SELECT
  72. t.*,a.field_name
  73. FROM
  74. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  75. WHERE
  76. t.transfer_state = -1
  77. AND t.transfer_addr != ''
  78. ORDER BY
  79. t.update_time
  80. LIMIT 1
  81. """
  82. df = __query(plt_connect, query_running_sql)
  83. if df.empty:
  84. df = __query(plt_connect, query_next_exdc_sql)
  85. return df
  86. else:
  87. return None
  88. def get_all_wind(field_code):
  89. query_sql = f"select engine_code,engine_name from wind_engine_group where field_code = '{field_code}' and del_state = 0"
  90. df = __query(plt_connect, query_sql)
  91. dict_datas = dict()
  92. if df.empty:
  93. return dict_datas
  94. else:
  95. for engine_code, engine_name in zip(df['engine_code'], df['engine_name']):
  96. dict_datas[engine_name] = engine_code
  97. return dict_datas
  98. def get_all_wind_company():
  99. query_sql = f"SELECT t.field_name FROM wind_field t where t.del_state = 0"
  100. df = __query(plt_connect, query_sql)
  101. df = pd.DataFrame()
  102. df['field_name'] = ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场']
  103. if df.empty:
  104. return []
  105. else:
  106. return list(df['field_name'].values)
  107. def get_trans_conf(wind_name, type):
  108. query_sql = f"SELECT * FROM trans_conf where wind_name = '{wind_name}' and type = '{type}'"
  109. return __query(trans_connect, query_sql)
  110. def save_to_trans_conf(data_dict=dict()):
  111. keys = data_dict.keys()
  112. col_str = ",".join(keys)
  113. data_s_str = ",".join(["%s"] * len(keys))
  114. insert_sql = f"replace into trans_conf ({col_str}) values ({data_s_str})"
  115. trans_connect.ping(reconnect=True)
  116. with trans_connect.cursor() as cursor:
  117. cursor.execute(insert_sql, tuple(data_dict.values()))
  118. trans_connect.commit()
  119. trans_connect.close()
  120. def creat_table_and_add_partition(table_name, count, read_type):
  121. query_table = f"SELECT t.TABLE_NAME FROM information_schema.`TABLES` t where t.TABLE_NAME = '{table_name}'"
  122. df = __query(trans_connect, query_table)
  123. if df.empty:
  124. create_sql = f"""
  125. CREATE TABLE
  126. IF NOT EXISTS `{table_name}` (
  127. `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
  128. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  129. `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
  130. `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
  131. `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
  132. `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
  133. `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
  134. `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
  135. `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
  136. `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
  137. `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
  138. `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
  139. `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
  140. `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
  141. `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  142. `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  143. `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
  144. `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
  145. `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
  146. `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
  147. `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
  148. `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
  149. `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
  150. `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
  151. `unusable` BIGINT (20) DEFAULT NULL COMMENT '不可利用',
  152. `power_curve_available` BIGINT (20) DEFAULT NULL COMMENT '功率曲线可用',
  153. `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
  154. `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
  155. `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
  156. `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
  157. `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  158. `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  159. `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  160. `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
  161. `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
  162. `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
  163. `wind_turbine_status` BIGINT (20) DEFAULT NULL COMMENT '风机状态1',
  164. `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
  165. `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
  166. `year` INT (4) DEFAULT NULL COMMENT '年',
  167. `month` INT (2) DEFAULT NULL COMMENT '月',
  168. `day` INT (2) DEFAULT NULL COMMENT '日',
  169. `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
  170. `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
  171. `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
  172. `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
  173. `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
  174. `param6` DOUBLE DEFAULT NULL COMMENT '预留6',
  175. `param7` DOUBLE DEFAULT NULL COMMENT '预留7',
  176. `param8` DOUBLE DEFAULT NULL COMMENT '预留8',
  177. `param9` DOUBLE DEFAULT NULL COMMENT '预留9',
  178. `param10` DOUBLE DEFAULT NULL COMMENT '预留10',
  179. KEY `time_stamp` (`time_stamp`),
  180. KEY `wind_turbine_number` (`wind_turbine_number`)
  181. ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
  182. """
  183. if read_type == 'second':
  184. create_sql = create_sql + f" PARTITION BY KEY (`wind_turbine_number`) PARTITIONS {count}"
  185. __ddl_sql(trans_connect, create_sql)
  186. def rename_table(table_name, renamed_table_name):
  187. rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
  188. try:
  189. __ddl_sql(trans_connect, rename_sql)
  190. except Exception as e:
  191. trans_print(e)
  192. def read_excel_and_save_to_db(table_name: str, file_path, batch_count=20000):
  193. df = read_file_to_df(file_path)
  194. trans_print("开始保存文件", file_path, "到数据库")
  195. begin = datetime.datetime.now()
  196. save_df_to_db(trans_connect, table_name, df, batch_count)
  197. trans_print("保存文件", file_path, "到数据库成功,耗时:", str(datetime.datetime.now() - begin))
  198. def save_df_to_db(connection: pymysql.Connection, table_name: str, df: DataFrame, batch_count=20000):
  199. col_str = ",".join(df.columns)
  200. data_s_str = ",".join(["%s"] * len(df.columns))
  201. insert_sql = f"INSERT INTO {table_name} ({col_str}) values ({data_s_str})"
  202. # 转化nan到null
  203. df.replace(np.nan, None, inplace=True)
  204. total_count = df.shape[0]
  205. for i in range(0, total_count + 1, batch_count):
  206. connection.ping(reconnect=True)
  207. with connection.cursor() as cursor:
  208. query_df = df.iloc[i:i + batch_count]
  209. values = [tuple(data) for data in query_df.values]
  210. cursor.executemany(insert_sql, values)
  211. connection.commit()
  212. connection.close()
  213. trans_print("保存条数成功,总条数", str(total_count), "已完成条数:",
  214. str(total_count if (i + batch_count) > total_count else (i + batch_count)))
  215. if __name__ == '__main__':
  216. df = get_exec_data()
  217. print(df)