import pandas as pd import pymysql from sqlalchemy import create_engine, text import warnings from typing import Optional, Dict, Any, List import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 映射中文表头到英文列名 HEADER_MAPPING = { '风机id': 'turbine_id', '风场id': 'wind_farm_id', '风机名称': 'turbine_name', '风机英文名称': 'turbine_name_en', 'USCADA别名': 'uscada_alias', '海拔': 'altitude', '纬度': 'latitude', '经度': 'longitude', '主控接口版本': 'main_control_version', '所属馈线ID': 'feeder_id', '所属期ID': 'phase_id', '轮毂高度': 'hub_height', '指标统计开始日期': 'stat_start_date', '投产运行日期': 'operation_date', '合同功率曲线(标准空气密度)': 'contract_power_curve_std', '合同功率曲线(当地空气密度)': 'contract_power_curve_local', '电量倍率': 'power_multiplier', '电量跳变斜率': 'power_jump_slope', '项目阶段': 'project_phase', '标杆风机标志(0或1)': 'benchmark_flag', '遥测辅助状态判断启用': 'telemetry_aux_status_enable', 'scada单风机UR': 'scada_ur', 'scada风机图标': 'scada_icon', 'scada数据冻结时间阈值': 'scada_freeze_threshold', 'scada应用配置导入模板': 'scada_template', '预留字段': 'reserved_field', '风机图片文件名': 'turbine_image', '时区': 'timezone', '描述': 'description', '额定容量': 'rated_capacity', '机型': 'model', '外部厂家风机ID': 'external_turbine_id', '风机制造商': 'manufacturer', '切入风速': 'cut_in_wind_speed', '切出风速': 'cut_out_wind_speed', '修改时间': 'update_time', '关联进线间隔id': 'incoming_line_id', '关联出线间隔id': 'outgoing_line_id', '关联主变压器id': 'main_transformer_id', '投产状态': 'operation_status', '质保状态': 'warranty_status', '有功功率积分算电量': 'active_power_integral', '开启故障检修判断': 'fault_maintenance_enable', '实施阶段': 'implementation_phase', '次级分组': 'secondary_group', '风机编号': 'turbine_number', '风机阶段': 'turbine_phase', '首次并网时间': 'first_grid_time', '数据湖源': 'data_lake_source', 'wsfd_local': 'wsfd_local', '叶轮直径': 'rotor_diameter' } # SQL语句定义 CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS info_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, turbine_id VARCHAR(50), wind_farm_id VARCHAR(50), turbine_name VARCHAR(200), turbine_name_en VARCHAR(200), uscada_alias VARCHAR(100), altitude DECIMAL(10, 4), latitude DECIMAL(12, 8), longitude DECIMAL(12, 8), main_control_version VARCHAR(100), feeder_id VARCHAR(50), phase_id VARCHAR(50), hub_height DECIMAL(10, 2), stat_start_date DATE, operation_date DATE, contract_power_curve_std VARCHAR(100), contract_power_curve_local VARCHAR(100), power_multiplier DECIMAL(10, 2), power_jump_slope DECIMAL(10, 2), project_phase INT, benchmark_flag TINYINT, telemetry_aux_status_enable VARCHAR(10), scada_ur VARCHAR(100), scada_icon VARCHAR(100), scada_freeze_threshold VARCHAR(100), scada_template VARCHAR(100), reserved_field VARCHAR(100), turbine_image VARCHAR(200), timezone VARCHAR(50), description TEXT, rated_capacity INT, model VARCHAR(100), external_turbine_id VARCHAR(100), manufacturer VARCHAR(100), cut_in_wind_speed DECIMAL(5, 2), cut_out_wind_speed DECIMAL(5, 2), update_time DATETIME, incoming_line_id VARCHAR(50), outgoing_line_id VARCHAR(50), main_transformer_id VARCHAR(50), operation_status VARCHAR(50), warranty_status VARCHAR(50), active_power_integral TINYINT, fault_maintenance_enable TINYINT, implementation_phase INT, secondary_group VARCHAR(100), turbine_number VARCHAR(50), turbine_phase VARCHAR(50), first_grid_time DATE, data_lake_source VARCHAR(100), wsfd_local VARCHAR(100), rotor_diameter INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_turbine_id (turbine_id), INDEX idx_wind_farm_id (wind_farm_id), INDEX idx_manufacturer (manufacturer), INDEX idx_model (model) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; """ COUNT_TABLE_SQL = "SELECT COUNT(*) FROM info_turbine" DROP_TABLE_SQL = "DROP TABLE IF EXISTS info_turbine" class DatabaseConfig: """数据库配置类""" def __init__(self, host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4'): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset def get_pymysql_config(self) -> Dict[str, Any]: """获取pymysql连接配置""" return { 'host': self.host, 'port': self.port, 'user': self.user, 'password': self.password, 'database': self.database, 'charset': self.charset } def get_sqlalchemy_url(self) -> str: """获取SQLAlchemy连接URL""" return f"mysql+pymysql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}?charset={self.charset}" class TurbineDataImporter: """风机数据导入器""" def __init__(self, db_config: DatabaseConfig): """ 初始化导入器 Args: db_config: 数据库配置对象 """ self.db_config = db_config self.engine = None self.pymysql_conn = None def connect_pymysql(self) -> pymysql.connections.Connection: """创建pymysql连接""" try: self.pymysql_conn = pymysql.connect(**self.db_config.get_pymysql_config()) logger.info("pymysql连接成功") return self.pymysql_conn except Exception as e: logger.error(f"pymysql连接失败: {e}") raise def connect_sqlalchemy(self) -> create_engine: """创建SQLAlchemy引擎""" try: self.engine = create_engine(self.db_config.get_sqlalchemy_url()) logger.info("SQLAlchemy引擎创建成功") return self.engine except Exception as e: logger.error(f"SQLAlchemy引擎创建失败: {e}") raise def close_connections(self): """关闭所有数据库连接""" if self.pymysql_conn: self.pymysql_conn.close() logger.info("pymysql连接已关闭") if self.engine: self.engine.dispose() logger.info("SQLAlchemy引擎已关闭") def create_table(self) -> bool: """ 创建风机信息表 Returns: bool: 是否成功创建表 """ try: if not self.pymysql_conn: self.connect_pymysql() cursor = self.pymysql_conn.cursor() cursor.execute(CREATE_TABLE_SQL) self.pymysql_conn.commit() logger.info("表 info_turbine 创建成功或已存在") return True except Exception as e: logger.error(f"创建表时发生错误: {e}") self.pymysql_conn.rollback() return False finally: if 'cursor' in locals(): cursor.close() def drop_table(self) -> bool: """ 删除风机信息表(用于重新创建) Returns: bool: 是否成功删除表 """ try: if not self.pymysql_conn: self.connect_pymysql() cursor = self.pymysql_conn.cursor() cursor.execute(DROP_TABLE_SQL) self.pymysql_conn.commit() logger.info("表 info_turbine 删除成功") return True except Exception as e: logger.error(f"删除表时发生错误: {e}") self.pymysql_conn.rollback() return False finally: if 'cursor' in locals(): cursor.close() def load_csv_data(self, file_path: str) -> Optional[pd.DataFrame]: """ 加载CSV文件数据 Args: file_path: CSV文件路径 Returns: Optional[pd.DataFrame]: 加载的数据框,失败时返回None """ try: logger.info(f"正在读取文件: {file_path}") # 读取CSV文件,指定编码 df = pd.read_csv(file_path, encoding='utf-8') logger.info(f"成功读取CSV文件,共 {len(df)} 条记录") # 重命名列(中文表头转英文) df = df.rename(columns=HEADER_MAPPING) logger.info("表头已转换为英文") # 处理空值和异常值 df = self._clean_data(df) # 数据类型转换 df = self._convert_data_types(df) return df except FileNotFoundError: logger.error(f"错误:找不到文件 {file_path}") return None except Exception as e: logger.error(f"读取CSV文件时发生错误: {e}") return None def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """清洗数据""" # 处理空字符串,转换为None df = df.replace(['', ' ', 'null', 'NULL'], None) # 删除全为空的列 df = df.dropna(axis=1, how='all') return df def _convert_data_types(self, df: pd.DataFrame) -> pd.DataFrame: """转换数据类型""" # 处理日期和时间字段 date_columns = ['stat_start_date', 'operation_date', 'first_grid_time'] for col in date_columns: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce').dt.date # 处理update_time字段 if 'update_time' in df.columns: df['update_time'] = pd.to_datetime(df['update_time'], errors='coerce') # 处理数值字段 numeric_columns = [ 'altitude', 'latitude', 'longitude', 'hub_height', 'rated_capacity', 'cut_in_wind_speed', 'cut_out_wind_speed', 'rotor_diameter', 'power_multiplier', 'power_jump_slope' ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 处理整数字段 int_columns = ['project_phase', 'benchmark_flag', 'implementation_phase'] for col in int_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64') # 处理布尔/标志字段 flag_columns = ['active_power_integral', 'fault_maintenance_enable'] for col in flag_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64') return df def import_to_mysql(self, df: pd.DataFrame, if_exists: str = 'append') -> bool: """ 导入数据到MySQL数据库 Args: df: 数据框 if_exists: 表存在时的处理方式 ('fail', 'replace', 'append') Returns: bool: 是否成功导入 """ try: if not self.engine: self.connect_sqlalchemy() logger.info("正在导入数据到MySQL数据库...") # 使用to_sql方法导入数据 df.to_sql( name='info_turbine', con=self.engine, if_exists=if_exists, index=False, chunksize=1000, method='multi' # 使用多值插入提高性能 ) logger.info(f"成功导入 {len(df)} 条记录到数据库表 info_turbine") return True except Exception as e: logger.error(f"导入数据到MySQL时发生错误: {e}") return False def get_record_count(self) -> Optional[int]: """ 获取表中的记录数 Returns: Optional[int]: 记录数,失败时返回None """ try: if not self.pymysql_conn: self.connect_pymysql() cursor = self.pymysql_conn.cursor() cursor.execute(COUNT_TABLE_SQL) count = cursor.fetchone()[0] cursor.close() logger.info(f"数据库表 info_turbine 中现有 {count} 条记录") return count except Exception as e: logger.error(f"获取记录数时发生错误: {e}") return None def get_table_info(self) -> Optional[Dict[str, Any]]: """ 获取表信息 Returns: Optional[Dict[str, Any]]: 表信息字典 """ try: if not self.pymysql_conn: self.connect_pymysql() cursor = self.pymysql_conn.cursor() # 获取表结构 cursor.execute("DESCRIBE info_turbine") columns = cursor.fetchall() # 获取索引信息 cursor.execute("SHOW INDEX FROM info_turbine") indexes = cursor.fetchall() cursor.close() return { 'columns': columns, 'indexes': indexes } except Exception as e: logger.error(f"获取表信息时发生错误: {e}") return None def run_import_pipeline(self, csv_file_path: str, drop_if_exists: bool = False) -> bool: """ 运行完整的数据导入流程 Args: csv_file_path: CSV文件路径 drop_if_exists: 是否删除已存在的表 Returns: bool: 整个流程是否成功 """ try: logger.info("开始执行风机数据导入程序...") # 步骤1: 创建数据库连接 self.connect_pymysql() self.connect_sqlalchemy() # 步骤2: 处理表(如果需要删除则先删除) if drop_if_exists: if not self.drop_table(): return False # 步骤3: 创建表 if not self.create_table(): return False # 步骤4: 加载CSV数据 df = self.load_csv_data(csv_file_path) if df is None: return False # 步骤5: 导入数据 if not self.import_to_mysql(df, if_exists='append'): return False # 步骤6: 验证数据 count = self.get_record_count() if count is not None: logger.info(f"数据导入验证成功,共导入 {len(df)} 条记录,数据库中现有 {count} 条记录") # 步骤7: 获取表信息(可选) table_info = self.get_table_info() if table_info: logger.info(f"表结构信息已获取,共有 {len(table_info['columns'])} 列") logger.info("风机数据导入程序执行完成!") return True except Exception as e: logger.error(f"数据导入流程执行失败: {e}") return False finally: # 清理连接 self.close_connections() def main(): """主函数""" # 忽略警告 warnings.filterwarnings('ignore') # 数据库配置 db_config = DatabaseConfig( host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4' ) # CSV文件路径 csv_file_path = './data/风机应用配置表_带叶轮直径.csv' # 创建导入器实例 importer = TurbineDataImporter(db_config) # 运行导入流程 # 如果表已经存在并且包含旧数据,可以设置 drop_if_exists=True 来重新创建表 success = importer.run_import_pipeline( csv_file_path=csv_file_path, drop_if_exists=False # 设置为True会删除已存在的表 ) if success: logger.info("数据导入成功完成!") else: logger.error("数据导入失败!") if __name__ == "__main__": main()