trans_mysql.py 9.8 KB


  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import numpy as np
  6. import pandas as pd
  7. # 建立数据库连接
  8. import pymysql
  9. from pandas import DataFrame
  10. from sqlalchemy import create_engine, Engine, text
  11. from utils.log.trans_log import trans_print
  12. from utils.trans_methods import read_file_to_df
  13. plt_user = 'admin'
  14. plt_password = 'admin123456'
  15. plt_host = '192.168.50.233'
  16. plt_port = 3306
  17. plt_database = 'energy'
  18. plt_connect = pymysql.connect(host=plt_host,
  19. user=plt_user,
  20. password=plt_password,
  21. db=plt_database,
  22. charset='utf8mb4',
  23. cursorclass=pymysql.cursors.DictCursor)
  24. trans_user = 'admin'
  25. trans_password = 'admin123456'
  26. trans_host = '192.168.50.233'
  27. trans_port = 3306
  28. trans_database = 'energy_data'
  29. trans_connect = pymysql.connect(host=trans_host,
  30. user=trans_user,
  31. password=trans_password,
  32. db=trans_database,
  33. charset='utf8mb4',
  34. cursorclass=pymysql.cursors.DictCursor)
  35. susscess_sql = "update data_transfer set transfer_state = 1 where batch_code = '{batch_no}' and transfer_type = '{trans_type}'";
  36. error_sql = "update data_transfer set transfer_state = 2 ,err_info='{message}' where batch_code = '{batch_no}' and transfer_type = '{trans_type}'"
  37. running_sql = "update data_transfer set transfer_state = 0,engine_count = {wind_count} where batch_code = '{batch_no}' and transfer_type = '{trans_type}'"
  38. def __query(connect: pymysql.Connection, sql):
  39. trans_print('开始执行SQL:', sql)
  40. connect.ping(reconnect=True)
  41. with connect.cursor() as cursor:
  42. cursor.execute(sql)
  43. datas = cursor.fetchall()
  44. df = pd.DataFrame(datas)
  45. connect.close()
  46. return df
  47. def __ddl_sql(connect: pymysql.Connection, sql):
  48. trans_print('开始执行SQL:', sql)
  49. connect.ping(reconnect=True)
  50. with connect.cursor() as cursor:
  51. cursor.execute(sql)
  52. connect.commit()
  53. connect.close()
  54. def update_trans_status(batch_no, trans_type, status, message="", wind_count=0):
  55. if status == 'success':
  56. exec_sql = susscess_sql
  57. elif status == 'running':
  58. exec_sql = running_sql
  59. else:
  60. exec_sql = error_sql
  61. exec_sql = exec_sql.format(batch_no=batch_no, status=status, message=message, trans_type=trans_type,
  62. wind_count=wind_count)
  63. __ddl_sql(plt_connect, exec_sql)
  64. # 获取执行的数据
  65. def get_exec_data():
  66. query_running_sql = "select 1 from data_transfer where transfer_state = 0"
  67. query_next_exdc_sql = """
  68. SELECT
  69. t.*,a.field_name
  70. FROM
  71. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  72. WHERE
  73. t.transfer_state = -1
  74. AND t.transfer_file_addr != ''
  75. ORDER BY
  76. t.update_time
  77. LIMIT 1
  78. """
  79. df = __query(plt_connect, query_running_sql)
  80. if df.empty:
  81. df = __query(plt_connect, query_next_exdc_sql)
  82. return df
  83. else:
  84. return None
  85. def get_all_wind(field_code):
  86. query_sql = f"select engine_code,engine_name from wind_engine_group where field_code = '{field_code}' and del_state = 1"
  87. df = __query(plt_connect, query_sql)
  88. dict_datas = dict()
  89. if df.empty:
  90. return dict_datas
  91. else:
  92. for engine_code, engine_name in zip(df['engine_code'], df['engine_name']):
  93. dict_datas[engine_name] = engine_code
  94. return dict_datas
  95. def creat_table_and_add_partition(table_name, count, read_type):
  96. query_table = f"SELECT t.TABLE_NAME FROM information_schema.`TABLES` t where t.TABLE_NAME = '{table_name}'"
  97. df = __query(trans_connect, query_table)
  98. if df.empty:
  99. create_sql = f"""
  100. CREATE TABLE
  101. IF NOT EXISTS `{table_name}` (
  102. `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
  103. `time_stamp` datetime NOT NULL COMMENT '时间戳',
  104. `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
  105. `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
  106. `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
  107. `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
  108. `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
  109. `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
  110. `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
  111. `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
  112. `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
  113. `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
  114. `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
  115. `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
  116. `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
  117. `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
  118. `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
  119. `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
  120. `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
  121. `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
  122. `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
  123. `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
  124. `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
  125. `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
  126. `unusable` BIGINT (20) DEFAULT NULL COMMENT '不可利用',
  127. `power_curve_available` BIGINT (20) DEFAULT NULL COMMENT '功率曲线可用',
  128. `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
  129. `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
  130. `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
  131. `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
  132. `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
  133. `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
  134. `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
  135. `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
  136. `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
  137. `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
  138. `wind_turbine_status` BIGINT (20) DEFAULT NULL COMMENT '风机状态1',
  139. `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
  140. `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
  141. `year` INT (4) DEFAULT NULL COMMENT '年',
  142. `month` INT (2) DEFAULT NULL COMMENT '月',
  143. `day` INT (2) DEFAULT NULL COMMENT '日',
  144. `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
  145. `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
  146. `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
  147. `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
  148. `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
  149. `param6` DOUBLE DEFAULT NULL COMMENT '预留6',
  150. `param7` DOUBLE DEFAULT NULL COMMENT '预留7',
  151. `param8` DOUBLE DEFAULT NULL COMMENT '预留8',
  152. `param9` DOUBLE DEFAULT NULL COMMENT '预留9',
  153. `param10` DOUBLE DEFAULT NULL COMMENT '预留10',
  154. KEY `time_stamp` (`time_stamp`),
  155. KEY `wind_turbine_number` (`wind_turbine_number`)
  156. ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
  157. """
  158. if read_type == 'second':
  159. create_sql = create_sql + f" PARTITION BY KEY (`wind_turbine_number`) PARTITIONS {count}"
  160. __ddl_sql(trans_connect, create_sql)
  161. def rename_table(table_name, renamed_table_name):
  162. rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
  163. try:
  164. __ddl_sql(trans_connect, rename_sql)
  165. except Exception as e:
  166. trans_print(e)
  167. def read_excel_and_save_to_db(table_name: str, file_path, batch_count=20000):
  168. df = read_file_to_df(file_path)
  169. trans_print("开始保存文件", file_path, "到数据库")
  170. begin = datetime.datetime.now()
  171. save_df_to_db(trans_connect, table_name, df, batch_count)
  172. trans_print("保存文件", file_path, "到数据库成功,耗时:", str(datetime.datetime.now() - begin))
  173. def save_df_to_db(connection: pymysql.Connection, table_name: str, df: DataFrame, batch_count=20000):
  174. col_str = ",".join(df.columns)
  175. data_s_str = ",".join(["%s"] * len(df.columns))
  176. insert_sql = f"INSERT INTO {table_name} ({col_str}) values ({data_s_str})"
  177. # 转化nan到null
  178. df.replace(np.nan, None, inplace=True)
  179. total_count = df.shape[0]
  180. for i in range(0, total_count + 1, batch_count):
  181. connection.ping(reconnect=True)
  182. with connection.cursor() as cursor:
  183. query_df = df.iloc[i:i + batch_count]
  184. values = [tuple(data) for data in query_df.values]
  185. cursor.executemany(insert_sql, values)
  186. connection.commit()
  187. connection.close()
  188. trans_print("保存条数成功,总条数", str(total_count), "已完成条数:",
  189. str(total_count if (i + batch_count) > total_count else (i + batch_count)))
  190. if __name__ == '__main__':
  191. df = get_exec_data()
  192. print(df)