import pymysql from typing import List, Dict, Any, Optional import logging from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # SQL语句定义 CREATE_MODEL_TURBINE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS info_model_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识', model VARCHAR(100) COMMENT '机型', manufacturer VARCHAR(100) COMMENT '制造商', rated_capacity INT COMMENT '额定容量(kW)', cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)', cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)', rotor_diameter INT COMMENT '叶轮直径(m)', hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)', turbine_count INT DEFAULT 0 COMMENT '该机型风机数量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY idx_no_model (no_model), INDEX idx_model (model), INDEX idx_manufacturer (manufacturer), INDEX idx_rated_capacity (rated_capacity) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表'; """ SELECT_MODEL_DATA_SQL = """ SELECT CONCAT( IFNULL(model, ''), '-', IFNULL(cut_in_wind_speed, ''), '-', IFNULL(cut_out_wind_speed, ''), '-', IFNULL(hub_height, '') ) AS no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, COUNT(*) AS turbine_count FROM info_turbine WHERE model IS NOT NULL GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height ORDER BY model, manufacturer, rated_capacity; """ DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine" CHECK_TABLE_EXISTS_SQL = """ SELECT COUNT(*) as table_exists FROM information_schema.tables WHERE table_schema = %s AND table_name = 'info_model_turbine' """ class DatabaseConfig: """数据库配置类""" def __init__(self, host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4'): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset def get_connection_config(self) -> Dict[str, Any]: """获取数据库连接配置""" return { 'host': self.host, 'port': self.port, 'user': self.user, 'password': self.password, 'database': self.database, 'charset': self.charset, 'cursorclass': pymysql.cursors.DictCursor # 返回字典格式的结果 } class ModelTurbineManager: """风机机型信息管理器""" def __init__(self, db_config: DatabaseConfig): """ 初始化管理器 Args: db_config: 数据库配置对象 """ self.db_config = db_config self.connection = None def connect(self) -> pymysql.connections.Connection: """创建数据库连接""" try: self.connection = pymysql.connect(**self.db_config.get_connection_config()) logger.info(f"成功连接到数据库: {self.db_config.host}:{self.db_config.port}/{self.db_config.database}") return self.connection except Exception as e: logger.error(f"数据库连接失败: {e}") raise def close(self): """关闭数据库连接""" if self.connection: self.connection.close() logger.info("数据库连接已关闭") def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """ 执行查询语句 Args: sql: SQL查询语句 params: SQL参数 Returns: 查询结果列表 """ if not self.connection: self.connect() try: with self.connection.cursor() as cursor: cursor.execute(sql, params) result = cursor.fetchall() logger.debug(f"查询执行成功,返回 {len(result)} 条记录") return result except Exception as e: logger.error(f"查询执行失败: {e}") raise def execute_update(self, sql: str, params: Optional[tuple] = None) -> int: """ 执行更新语句(INSERT, UPDATE, DELETE) Args: sql: SQL更新语句 params: SQL参数 Returns: 影响的行数 """ if not self.connection: self.connect() try: with self.connection.cursor() as cursor: affected_rows = cursor.execute(sql, params) self.connection.commit() logger.debug(f"更新执行成功,影响 {affected_rows} 行") return affected_rows except Exception as e: self.connection.rollback() logger.error(f"更新执行失败: {e}") raise def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool: """ 检查表是否存在 Args: table_name: 表名 Returns: bool: 表是否存在 """ try: result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,)) return result[0]['table_exists'] > 0 except Exception as e: logger.error(f"检查表存在性失败: {e}") return False def create_model_turbine_table(self) -> bool: """ 创建风机机型信息表 Returns: bool: 是否成功创建表 """ try: logger.info("开始创建风机机型信息表...") self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表创建成功") return True except Exception as e: logger.error(f"创建风机机型信息表失败: {e}") return False def drop_model_turbine_table(self) -> bool: """ 删除风机机型信息表 Returns: bool: 是否成功删除表 """ try: logger.info("开始删除风机机型信息表...") self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表删除成功") return True except Exception as e: logger.error(f"删除风机机型信息表失败: {e}") return False def get_model_data(self) -> List[Dict[str, Any]]: """ 从info_turbine表获取机型分组数据 Returns: 机型数据列表 """ try: logger.info("开始从info_turbine表查询机型数据...") data = self.execute_query(SELECT_MODEL_DATA_SQL) logger.info(f"成功查询到 {len(data)} 条机型记录") return data except Exception as e: logger.error(f"查询机型数据失败: {e}") return [] def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int: """ 将机型数据插入到info_model_turbine表 Args: model_data: 机型数据列表 Returns: int: 成功插入的记录数 """ if not model_data: logger.warning("没有数据需要插入") return 0 insert_sql = """ INSERT INTO info_model_turbine (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, turbine_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE turbine_count = VALUES(turbine_count), updated_at = CURRENT_TIMESTAMP """ try: logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...") # 准备插入数据 insert_data = [] for item in model_data: insert_data.append(( item['no_model'], item['model'], item['manufacturer'], item['rated_capacity'], item['cut_in_wind_speed'], item['cut_out_wind_speed'], item['rotor_diameter'], item['hub_height'], item.get('turbine_count', 1) # 使用查询中的count值 )) # 批量插入数据 with self.connection.cursor() as cursor: affected_rows = cursor.executemany(insert_sql, insert_data) self.connection.commit() logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表") return affected_rows except Exception as e: self.connection.rollback() logger.error(f"插入机型数据失败: {e}") raise def get_model_turbine_stats(self) -> Dict[str, Any]: """ 获取info_model_turbine表的统计信息 Returns: 统计信息字典 """ try: # 查询统计信息 stats_sql = """ SELECT COUNT(*) as total_models, COUNT(DISTINCT manufacturer) as manufacturer_count, COUNT(DISTINCT model) as model_count, MIN(rated_capacity) as min_capacity, MAX(rated_capacity) as max_capacity, AVG(rated_capacity) as avg_capacity, SUM(turbine_count) as total_turbines FROM info_model_turbine """ result = self.execute_query(stats_sql) return result[0] if result else {} except Exception as e: logger.error(f"获取统计信息失败: {e}") return {} def print_model_summary(self, model_data: List[Dict[str, Any]]): """ 打印机型数据摘要 Args: model_data: 机型数据列表 """ if not model_data: logger.info("没有机型数据") return print("\n" + "="*80) print("风机机型数据摘要") print("="*80) print(f"总机型数: {len(model_data)}") # 按制造商统计 manufacturers = {} for item in model_data: manufacturer = item.get('manufacturer', '未知') manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1 print(f"制造商数: {len(manufacturers)}") print("\n制造商分布:") for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True): print(f" {manufacturer}: {count} 种机型") # 按额定容量统计 capacities = {} for item in model_data: capacity = item.get('rated_capacity', 0) if capacity: capacity_range = f"{capacity}kW" capacities[capacity_range] = capacities.get(capacity_range, 0) + 1 print("\n额定容量分布:") for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))): print(f" {capacity}: {count} 种机型") print("="*80) def run_model_extraction_pipeline(self, recreate_table: bool = False) -> bool: """ 运行完整的机型数据提取和入库流程 Args: recreate_table: 是否重新创建表 Returns: bool: 整个流程是否成功 """ try: logger.info("开始执行风机机型数据提取流程...") # 步骤1: 连接数据库 self.connect() # 步骤2: 检查或创建表 if recreate_table: self.drop_model_turbine_table() self.create_model_turbine_table() else: if not self.check_table_exists(): self.create_model_turbine_table() else: logger.info("info_model_turbine表已存在,将追加数据") # 步骤3: 从info_turbine表获取机型数据 model_data = self.get_model_data() if not model_data: logger.error("未获取到机型数据,流程终止") return False # 步骤4: 打印摘要信息 self.print_model_summary(model_data) # 步骤5: 插入数据到info_model_turbine表 inserted_count = self.insert_model_data(model_data) # 步骤6: 获取并显示统计信息 stats = self.get_model_turbine_stats() if stats: print("\n数据库统计信息:") print(f" 总机型数: {stats.get('total_models', 0)}") print(f" 制造商数: {stats.get('manufacturer_count', 0)}") print(f" 总风机数: {stats.get('total_turbines', 0)}") print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW") print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW") logger.info("风机机型数据提取流程执行完成!") return True except Exception as e: logger.error(f"机型数据提取流程执行失败: {e}") return False finally: # 关闭连接 self.close() def main(): """主函数""" # 数据库配置 db_config = DatabaseConfig( host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4' ) # 创建管理器实例 manager = ModelTurbineManager(db_config) # 运行机型数据提取流程 # 设置 recreate_table=True 会删除并重新创建表 # 设置 recreate_table=False 会追加数据(使用ON DUPLICATE KEY UPDATE) success = manager.run_model_extraction_pipeline( recreate_table=False # 设置为True会重新创建表 ) if success: logger.info("风机机型数据提取成功完成!") else: logger.error("风机机型数据提取失败!") if __name__ == "__main__": main()