|
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/15
- # @Author : 魏志亮
- import datetime
- import numpy as np
- import pandas as pd
- # 建立数据库连接
- import pymysql
- from pandas import DataFrame
- from sqlalchemy import create_engine, Engine, text
- from utils.log.trans_log import trans_print
- from utils.trans_methods import read_file_to_df
- plt_user = 'admin'
- plt_password = 'admin123456'
- plt_host = '192.168.50.233'
- plt_port = 3306
- plt_database = 'energy'
- plt_connect = pymysql.connect(host=plt_host,
- user=plt_user,
- password=plt_password,
- db=plt_database,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
- trans_user = 'admin'
- trans_password = 'admin123456'
- trans_host = '192.168.50.233'
- trans_port = 3306
- trans_database = 'energy_data'
- trans_connect = pymysql.connect(host=trans_host,
- user=trans_user,
- password=trans_password,
- db=trans_database,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
- susscess_sql = "update data_transfer set transfer_state = 1 where batch_code = '{batch_no}' and transfer_type = '{" \
- "trans_type}'";
- error_sql = "update data_transfer set transfer_state = 2 ,err_info='{message}' where batch_code = '{batch_no}' and " \
- "transfer_type = '{trans_type}' "
- running_sql = "update data_transfer set transfer_state = 0,engine_count = {wind_count} where batch_code = '{" \
- "batch_no}' and transfer_type = '{trans_type}' "
- def __query(connect: pymysql.Connection, sql):
- trans_print('开始执行SQL:', sql)
- connect.ping(reconnect=True)
- with connect.cursor() as cursor:
- cursor.execute(sql)
- datas = cursor.fetchall()
- df = pd.DataFrame(datas)
- connect.close()
- return df
- def __ddl_sql(connect: pymysql.Connection, sql):
- trans_print('开始执行SQL:', sql)
- connect.ping(reconnect=True)
- with connect.cursor() as cursor:
- cursor.execute(sql)
- connect.commit()
- connect.close()
- def update_trans_status(batch_no, trans_type, status, message="", wind_count=0):
- if status == 'success':
- exec_sql = susscess_sql
- elif status == 'running':
- exec_sql = running_sql
- else:
- exec_sql = error_sql
- exec_sql = exec_sql.format(batch_no=batch_no, status=status, message=message, trans_type=trans_type,
- wind_count=wind_count)
- __ddl_sql(plt_connect, exec_sql)
- # 获取执行的数据
- def get_exec_data():
- query_running_sql = "select 1 from data_transfer where transfer_state = 0"
- query_next_exdc_sql = """
- SELECT
- t.*,a.field_name
- FROM
- data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
- WHERE
- t.transfer_state = -1
- AND t.transfer_addr != ''
- ORDER BY
- t.update_time
- LIMIT 1
- """
- df = __query(plt_connect, query_running_sql)
- if df.empty:
- df = __query(plt_connect, query_next_exdc_sql)
- return df
- else:
- return None
- def get_all_wind(field_code):
- query_sql = f"select engine_code,engine_name from wind_engine_group where field_code = '{field_code}' and del_state = 0"
- df = __query(plt_connect, query_sql)
- dict_datas = dict()
- if df.empty:
- return dict_datas
- else:
- for engine_code, engine_name in zip(df['engine_code'], df['engine_name']):
- dict_datas[engine_name] = engine_code
- return dict_datas
- def get_all_wind_company():
- query_sql = f"SELECT t.field_name FROM wind_field t where t.del_state = 0"
- df = __query(plt_connect, query_sql)
- df = pd.DataFrame()
- df['field_name'] = ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场']
- if df.empty:
- return []
- else:
- return list(df['field_name'].values)
- def get_trans_conf(wind_name, type):
- query_sql = f"SELECT * FROM trans_conf where wind_name = '{wind_name}' and type = '{type}'"
- return __query(trans_connect, query_sql)
- def save_to_trans_conf(data_dict=dict()):
- keys = data_dict.keys()
- col_str = ",".join(keys)
- data_s_str = ",".join(["%s"] * len(keys))
- insert_sql = f"replace into trans_conf ({col_str}) values ({data_s_str})"
- trans_connect.ping(reconnect=True)
- with trans_connect.cursor() as cursor:
- cursor.execute(insert_sql, tuple(data_dict.values()))
- trans_connect.commit()
- trans_connect.close()
- def creat_table_and_add_partition(table_name, count, read_type):
- query_table = f"SELECT t.TABLE_NAME FROM information_schema.`TABLES` t where t.TABLE_NAME = '{table_name}'"
- df = __query(trans_connect, query_table)
- if df.empty:
- create_sql = f"""
- CREATE TABLE
- IF NOT EXISTS `{table_name}` (
- `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
- `time_stamp` datetime NOT NULL COMMENT '时间戳',
- `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
- `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
- `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
- `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
- `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
- `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
- `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
- `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
- `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
- `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
- `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
- `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
- `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
- `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
- `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
- `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
- `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
- `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
- `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
- `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
- `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
- `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
- `unusable` BIGINT (20) DEFAULT NULL COMMENT '不可利用',
- `power_curve_available` BIGINT (20) DEFAULT NULL COMMENT '功率曲线可用',
- `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
- `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
- `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
- `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
- `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
- `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
- `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
- `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
- `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
- `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
- `wind_turbine_status` BIGINT (20) DEFAULT NULL COMMENT '风机状态1',
- `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
- `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
- `year` INT (4) DEFAULT NULL COMMENT '年',
- `month` INT (2) DEFAULT NULL COMMENT '月',
- `day` INT (2) DEFAULT NULL COMMENT '日',
- `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
- `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
- `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
- `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
- `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
- `param6` DOUBLE DEFAULT NULL COMMENT '预留6',
- `param7` DOUBLE DEFAULT NULL COMMENT '预留7',
- `param8` DOUBLE DEFAULT NULL COMMENT '预留8',
- `param9` DOUBLE DEFAULT NULL COMMENT '预留9',
- `param10` DOUBLE DEFAULT NULL COMMENT '预留10',
- KEY `time_stamp` (`time_stamp`),
- KEY `wind_turbine_number` (`wind_turbine_number`)
- ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
- """
- if read_type == 'second':
- create_sql = create_sql + f" PARTITION BY KEY (`wind_turbine_number`) PARTITIONS {count}"
- __ddl_sql(trans_connect, create_sql)
- def rename_table(table_name, renamed_table_name):
- rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
- try:
- __ddl_sql(trans_connect, rename_sql)
- except Exception as e:
- trans_print(e)
- def read_excel_and_save_to_db(table_name: str, file_path, batch_count=20000):
- df = read_file_to_df(file_path)
- trans_print("开始保存文件", file_path, "到数据库")
- begin = datetime.datetime.now()
- save_df_to_db(trans_connect, table_name, df, batch_count)
- trans_print("保存文件", file_path, "到数据库成功,耗时:", str(datetime.datetime.now() - begin))
- def save_df_to_db(connection: pymysql.Connection, table_name: str, df: DataFrame, batch_count=20000):
- col_str = ",".join(df.columns)
- data_s_str = ",".join(["%s"] * len(df.columns))
- insert_sql = f"INSERT INTO {table_name} ({col_str}) values ({data_s_str})"
- # 转化nan到null
- df.replace(np.nan, None, inplace=True)
- total_count = df.shape[0]
- for i in range(0, total_count + 1, batch_count):
- connection.ping(reconnect=True)
- with connection.cursor() as cursor:
- query_df = df.iloc[i:i + batch_count]
- values = [tuple(data) for data in query_df.values]
- cursor.executemany(insert_sql, values)
- connection.commit()
- connection.close()
- trans_print("保存条数成功,总条数", str(total_count), "已完成条数:",
- str(total_count if (i + batch_count) > total_count else (i + batch_count)))
- if __name__ == '__main__':
- df = get_exec_data()
- print(df)
|