# -*- coding: utf-8 -*- # @Time : 2024/5/15 # @Author : 魏志亮 import datetime import numpy as np import pandas as pd # 建立数据库连接 import pymysql from pandas import DataFrame from sqlalchemy import create_engine, Engine, text from utils.log.trans_log import trans_print from utils.trans_methods import read_file_to_df plt_user = 'admin' plt_password = 'admin123456' plt_host = '192.168.50.233' plt_port = 3306 plt_database = 'energy' plt_connect = pymysql.connect(host=plt_host, user=plt_user, password=plt_password, db=plt_database, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) trans_user = 'admin' trans_password = 'admin123456' trans_host = '192.168.50.233' trans_port = 3306 trans_database = 'energy_data' trans_connect = pymysql.connect(host=trans_host, user=trans_user, password=trans_password, db=trans_database, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) susscess_sql = "update data_transfer set transfer_state = 1 where batch_code = '{batch_no}' and transfer_type = '{" \ "trans_type}'"; error_sql = "update data_transfer set transfer_state = 2 ,err_info='{message}' where batch_code = '{batch_no}' and " \ "transfer_type = '{trans_type}' " running_sql = "update data_transfer set transfer_state = 0,engine_count = {wind_count} where batch_code = '{" \ "batch_no}' and transfer_type = '{trans_type}' " def __query(connect: pymysql.Connection, sql): trans_print('开始执行SQL:', sql) connect.ping(reconnect=True) with connect.cursor() as cursor: cursor.execute(sql) datas = cursor.fetchall() df = pd.DataFrame(datas) connect.close() return df def __ddl_sql(connect: pymysql.Connection, sql): trans_print('开始执行SQL:', sql) connect.ping(reconnect=True) with connect.cursor() as cursor: cursor.execute(sql) connect.commit() connect.close() def update_trans_status(batch_no, trans_type, status, message="", wind_count=0): if status == 'success': exec_sql = susscess_sql elif status == 'running': exec_sql = running_sql else: exec_sql = error_sql exec_sql = exec_sql.format(batch_no=batch_no, status=status, message=message, trans_type=trans_type, wind_count=wind_count) __ddl_sql(plt_connect, exec_sql) # 获取执行的数据 def get_exec_data(): query_running_sql = "select 1 from data_transfer where transfer_state = 0" query_next_exdc_sql = """ SELECT t.*,a.field_name FROM data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code WHERE t.transfer_state = -1 AND t.transfer_addr != '' ORDER BY t.update_time LIMIT 1 """ df = __query(plt_connect, query_running_sql) if df.empty: df = __query(plt_connect, query_next_exdc_sql) return df else: return None def get_all_wind(field_code): query_sql = f"select engine_code,engine_name from wind_engine_group where field_code = '{field_code}' and del_state = 0" df = __query(plt_connect, query_sql) dict_datas = dict() if df.empty: return dict_datas else: for engine_code, engine_name in zip(df['engine_code'], df['engine_name']): dict_datas[engine_name] = engine_code return dict_datas def get_all_wind_company(): query_sql = f"SELECT t.field_name FROM wind_field t where t.del_state = 0" df = __query(plt_connect, query_sql) df = pd.DataFrame() df['field_name'] = ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场'] if df.empty: return [] else: return list(df['field_name'].values) def get_trans_conf(wind_name, type): query_sql = f"SELECT * FROM trans_conf where wind_name = '{wind_name}' and type = '{type}'" return __query(trans_connect, query_sql) def save_to_trans_conf(data_dict=dict()): keys = data_dict.keys() col_str = ",".join(keys) data_s_str = ",".join(["%s"] * len(keys)) insert_sql = f"replace into trans_conf ({col_str}) values ({data_s_str})" trans_connect.ping(reconnect=True) with trans_connect.cursor() as cursor: cursor.execute(insert_sql, tuple(data_dict.values())) trans_connect.commit() trans_connect.close() def creat_table_and_add_partition(table_name, count, read_type): query_table = f"SELECT t.TABLE_NAME FROM information_schema.`TABLES` t where t.TABLE_NAME = '{table_name}'" df = __query(trans_connect, query_table) if df.empty: create_sql = f""" CREATE TABLE IF NOT EXISTS `{table_name}` ( `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号', `time_stamp` datetime NOT NULL COMMENT '时间戳', `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率', `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速', `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速', `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速', `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1', `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2', `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3', `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置', `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向', `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度', `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值', `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温', `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度', `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度', `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度', `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度', `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动', `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动', `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩', `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩', `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数', `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数', `unusable` BIGINT (20) DEFAULT NULL COMMENT '不可利用', `power_curve_available` BIGINT (20) DEFAULT NULL COMMENT '功率曲线可用', `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速', `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)', `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度', `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度', `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度', `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度', `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度', `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度', `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度', `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度', `wind_turbine_status` BIGINT (20) DEFAULT NULL COMMENT '风机状态1', `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2', `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度', `year` INT (4) DEFAULT NULL COMMENT '年', `month` INT (2) DEFAULT NULL COMMENT '月', `day` INT (2) DEFAULT NULL COMMENT '日', `param1` DOUBLE DEFAULT NULL COMMENT '预留1', `param2` DOUBLE DEFAULT NULL COMMENT '预留2', `param3` DOUBLE DEFAULT NULL COMMENT '预留3', `param4` DOUBLE DEFAULT NULL COMMENT '预留4', `param5` DOUBLE DEFAULT NULL COMMENT '预留5', `param6` DOUBLE DEFAULT NULL COMMENT '预留6', `param7` DOUBLE DEFAULT NULL COMMENT '预留7', `param8` DOUBLE DEFAULT NULL COMMENT '预留8', `param9` DOUBLE DEFAULT NULL COMMENT '预留9', `param10` DOUBLE DEFAULT NULL COMMENT '预留10', KEY `time_stamp` (`time_stamp`), KEY `wind_turbine_number` (`wind_turbine_number`) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 """ if read_type == 'second': create_sql = create_sql + f" PARTITION BY KEY (`wind_turbine_number`) PARTITIONS {count}" __ddl_sql(trans_connect, create_sql) def rename_table(table_name, renamed_table_name): rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}" try: __ddl_sql(trans_connect, rename_sql) except Exception as e: trans_print(e) def read_excel_and_save_to_db(table_name: str, file_path, batch_count=20000): df = read_file_to_df(file_path) trans_print("开始保存文件", file_path, "到数据库") begin = datetime.datetime.now() save_df_to_db(trans_connect, table_name, df, batch_count) trans_print("保存文件", file_path, "到数据库成功,耗时:", str(datetime.datetime.now() - begin)) def save_df_to_db(connection: pymysql.Connection, table_name: str, df: DataFrame, batch_count=20000): col_str = ",".join(df.columns) data_s_str = ",".join(["%s"] * len(df.columns)) insert_sql = f"INSERT INTO {table_name} ({col_str}) values ({data_s_str})" # 转化nan到null df.replace(np.nan, None, inplace=True) total_count = df.shape[0] for i in range(0, total_count + 1, batch_count): connection.ping(reconnect=True) with connection.cursor() as cursor: query_df = df.iloc[i:i + batch_count] values = [tuple(data) for data in query_df.values] cursor.executemany(insert_sql, values) connection.commit() connection.close() trans_print("保存条数成功,总条数", str(total_count), "已完成条数:", str(total_count if (i + batch_count) > total_count else (i + batch_count))) if __name__ == '__main__': df = get_exec_data() print(df)