import pymysql import threading from typing import List, Dict, Any, Optional, Tuple import logging from datetime import datetime from collections import Counter import statistics import math # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ConnectionPool: """MySQL数据库连接池""" def __init__(self, host, port, user, password, database, charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota self._lock = threading.Lock() self._connections = [] self._in_use = set() def _create_connection(self): """创建新连接并设置内存配额""" try: conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset=self.charset, cursorclass=pymysql.cursors.DictCursor ) # 设置会话内存配额 with conn.cursor() as cursor: cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}") conn.commit() logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}") return conn except Exception as e: logger.error(f"创建数据库连接失败: {e}") raise def get_connection(self): """从连接池获取连接""" with self._lock: # 如果有空闲连接,返回一个 for conn in self._connections: if conn not in self._in_use: self._in_use.add(conn) logger.debug(f"从连接池获取现有连接") return conn # 如果没有空闲连接但可以创建新连接 if len(self._connections) < self.max_connections: conn = self._create_connection() self._connections.append(conn) self._in_use.add(conn) logger.debug(f"创建新连接,当前连接数: {len(self._connections)}") return conn # 连接池已满,等待 logger.warning("连接池已满,等待可用连接...") # 这里简单实现为等待并重试 import time time.sleep(1) return self.get_connection() # 递归重试 def release_connection(self, conn): """释放连接回连接池""" with self._lock: if conn in self._in_use: self._in_use.remove(conn) logger.debug(f"释放连接回连接池") def close_all(self): """关闭所有连接""" with self._lock: for conn in self._connections: try: conn.close() except: pass self._connections.clear() self._in_use.clear() logger.info("已关闭所有数据库连接") class DatabaseConfig: """数据库配置类""" def __init__(self, host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota class SCADADataProcessor: """SCADA数据处理类,用于计算额定转速和传动比""" @staticmethod def calculate_mode(values: List[float], decimal_places: int = 1) -> float: """ 计算众数(对连续数据使用四舍五入后统计) Args: values: 数值列表 decimal_places: 保留的小数位数 Returns: 众数值 """ if not values: return 0.0 # 对值进行四舍五入处理,减少噪声影响 rounded_values = [round(v, decimal_places) for v in values] # 使用Counter统计频率 counter = Counter(rounded_values) # 找到众数 if counter: mode_value, count = counter.most_common(1)[0] logger.debug(f"众数统计: 值={mode_value}, 频次={count}, 总数据点={len(values)}") return mode_value return 0.0 @staticmethod def calculate_median(values: List[float]) -> float: """ 计算中位数 Args: values: 数值列表 Returns: 中位数值 """ if not values: return 0.0 try: return float(statistics.median(values)) except: # 如果统计模块出错,使用排序方法 sorted_values = sorted(values) n = len(sorted_values) if n % 2 == 1: return sorted_values[n // 2] else: return (sorted_values[n // 2 - 1] + sorted_values[n // 2]) / 2 @staticmethod def calculate_rated_speeds_and_ratio(data: List[Dict[str, Any]]) -> Tuple[float, float, float]: """ 计算额定叶轮转速、额定发电机转速和传动比 Args: data: SCADA数据列表,包含转子转速和发电机转速 Returns: Tuple[float, float, float]: (rated_rotor_spd, rated_gen_spd, transmission_ratio) """ if not data: return 0.0, 0.0, 0.0 # 提取数据 rotor_speeds = [] gen_speeds = [] for record in data: rotor_spd = record.get('rotor_spd') gen_spd = record.get('gen_spd') if rotor_spd is not None and gen_spd is not None: try: rotor_speeds.append(float(rotor_spd)) gen_speeds.append(float(gen_spd)) except (ValueError, TypeError): continue if not rotor_speeds or not gen_speeds: logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}") return 0.0, 0.0, 0.0 # 计算额定转速(使用众数方法) rated_rotor_spd = SCADADataProcessor.calculate_mode(rotor_speeds, decimal_places=1) rated_gen_spd = SCADADataProcessor.calculate_mode(gen_speeds, decimal_places=1) # 如果众数方法效果不好,尝试使用中位数 if rated_rotor_spd <= 0 or rated_gen_spd <= 0: rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds) rated_gen_spd = SCADADataProcessor.calculate_median(gen_speeds) # 计算传动比 if rated_rotor_spd > 0: transmission_ratio = rated_gen_spd / rated_rotor_spd else: transmission_ratio = 0.0 # 验证数据合理性 if rated_rotor_spd > 30: # 叶轮转速通常小于30 rpm logger.warning(f"叶轮转速异常高: {rated_rotor_spd} rpm,使用中位数重新计算") rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds) if rated_rotor_spd > 0: transmission_ratio = rated_gen_spd / rated_rotor_spd if transmission_ratio > 200: # 传动比通常小于200 logger.warning(f"传动比异常高: {transmission_ratio}") transmission_ratio = 0.0 logger.debug(f"计算结果: 转子转速={rated_rotor_spd:.2f} rpm, " f"发电机转速={rated_gen_spd:.2f} rpm, " f"传动比={transmission_ratio:.2f}") return rated_rotor_spd, rated_gen_spd, transmission_ratio @staticmethod def detect_turbine_type(transmission_ratio: float, rated_gen_spd: float) -> str: """ 根据传动比和发电机额定转速判断风机类型 Returns: str: 风机类型 (直驱/双馈/半直驱/未知) """ if transmission_ratio <= 1.2: return "直驱" elif 1.2 < transmission_ratio <= 30: return "半直驱" elif 30 < transmission_ratio <= 120: return "双馈" else: # 根据发电机转速进一步判断 if rated_gen_spd < 50: # 直驱发电机转速很低 return "直驱" elif 1000 <= rated_gen_spd <= 2000: # 双馈通常在同步转速附近 return "双馈" elif rated_gen_spd > 2000: # 半直驱转速较高 return "半直驱" else: return "未知" # SQL语句定义 CREATE_MODEL_TURBINE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS info_model_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识', model VARCHAR(100) COMMENT '机型', manufacturer VARCHAR(100) COMMENT '制造商', rated_capacity INT COMMENT '额定容量(kW)', cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)', cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)', rotor_diameter INT COMMENT '叶轮直径(m)', hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)', turbine_count INT DEFAULT 0 COMMENT '该机型风机数量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY idx_no_model (no_model), INDEX idx_model (model), INDEX idx_manufacturer (manufacturer), INDEX idx_rated_capacity (rated_capacity) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表'; """ ALTER_MODEL_TURBINE_TABLE_SQL = """ ALTER TABLE info_model_turbine ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 3) COMMENT '额定叶轮转速(rpm)', ADD COLUMN IF NOT EXISTS rated_gen_spd DECIMAL(10, 3) COMMENT '额定发电机转速(rpm)', ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 4) COMMENT '传动比', ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)', ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间', ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数'; """ SELECT_MODEL_DATA_SQL = """ SELECT CONCAT( IFNULL(model, ''), '-', IFNULL(cut_in_wind_speed, ''), '-', IFNULL(cut_out_wind_speed, ''), '-', IFNULL(hub_height, '') ) AS no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, COUNT(*) AS turbine_count FROM info_turbine WHERE model IS NOT NULL GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height ORDER BY model, manufacturer, rated_capacity; """ SELECT_NO_MODEL_LIST_SQL = """ SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model; """ SELECT_SCADA_FOR_NO_MODEL_SQL = """ SELECT it.wind_farm_id, it.turbine_id, it.model, icpt.rated_wind_speed, imt.no_model, imt.rated_capacity, imt.cut_in_wind_speed, imt.cut_out_wind_speed, imt.rotor_diameter, imt.hub_height, dst.data_time, dst.wind_spd, dst.rotor_spd, dst.gen_spd, dst.p_active FROM info_turbine it, info_curve_power_turbine icpt, info_model_turbine imt, data_scada_turbine dst WHERE 1=1 AND it.wind_farm_id = dst.id_farm AND it.turbine_id = dst.id_turbine AND it.model = dst.no_model_turbine AND it.model = imt.model AND it.cut_in_wind_speed = imt.cut_in_wind_speed AND it.cut_out_wind_speed = imt.cut_out_wind_speed AND it.rotor_diameter = imt.rotor_diameter AND it.hub_height = imt.hub_height AND it.wind_farm_id = icpt.wind_farm_id AND it.model = icpt.standard_model AND imt.no_model = %s AND dst.wind_spd >= icpt.rated_wind_speed AND dst.p_active >= imt.rated_capacity * 0.95 AND dst.p_active <= imt.rated_capacity * 1.05 AND dst.rotor_spd IS NOT NULL AND dst.gen_spd IS NOT NULL AND dst.rotor_spd > 0 AND dst.gen_spd > 0 ORDER BY dst.data_time; """ DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine" CHECK_TABLE_EXISTS_SQL = """ SELECT COUNT(*) as table_exists FROM information_schema.tables WHERE table_schema = %s AND table_name = 'info_model_turbine' """ CHECK_TABLE_COLUMNS_SQL = """ SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine' """ UPDATE_MODEL_PARAMETERS_SQL = """ UPDATE info_model_turbine SET rated_rotor_spd = %s, rated_gen_spd = %s, transmission_ratio = %s, turbine_type = %s, calculation_time = %s, data_points = %s, updated_at = CURRENT_TIMESTAMP WHERE no_model = %s """ class ModelTurbineManager: """风机机型信息管理器""" def __init__(self, db_config: DatabaseConfig): """ 初始化管理器 Args: db_config: 数据库配置对象 """ self.db_config = db_config self.connection_pool = None self._initialize_connection_pool() def _initialize_connection_pool(self): """初始化数据库连接池""" self.connection_pool = ConnectionPool( host=self.db_config.host, port=self.db_config.port, user=self.db_config.user, password=self.db_config.password, database=self.db_config.database, charset=self.db_config.charset, max_connections=self.db_config.max_connections, mem_quota=self.db_config.mem_quota ) logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}") def get_connection(self): """从连接池获取连接""" return self.connection_pool.get_connection() def release_connection(self, conn): """释放连接回连接池""" self.connection_pool.release_connection(conn) def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """ 执行查询语句 Args: sql: SQL查询语句 params: SQL参数 Returns: 查询结果列表 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: cursor.execute(sql, params) result = cursor.fetchall() logger.debug(f"查询执行成功,返回 {len(result)} 条记录") return result except Exception as e: logger.error(f"查询执行失败: {e}") logger.error(f"SQL: {sql}") if params: logger.error(f"参数: {params}") raise finally: self.release_connection(conn) def execute_update(self, sql: str, params: Optional[tuple] = None) -> int: """ 执行更新语句(INSERT, UPDATE, DELETE) Args: sql: SQL更新语句 params: SQL参数 Returns: 影响的行数 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: affected_rows = cursor.execute(sql, params) conn.commit() logger.debug(f"更新执行成功,影响 {affected_rows} 行") return affected_rows except Exception as e: conn.rollback() logger.error(f"更新执行失败: {e}") logger.error(f"SQL: {sql}") if params: logger.error(f"参数: {params}") raise finally: self.release_connection(conn) def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int: """ 批量执行更新语句 Args: sql: SQL更新语句 params_list: SQL参数列表 Returns: 影响的行数 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: affected_rows = cursor.executemany(sql, params_list) conn.commit() logger.debug(f"批量更新执行成功,影响 {affected_rows} 行") return affected_rows except Exception as e: conn.rollback() logger.error(f"批量更新执行失败: {e}") logger.error(f"SQL: {sql}") raise finally: self.release_connection(conn) def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool: """ 检查表是否存在 Args: table_name: 表名 Returns: bool: 表是否存在 """ try: result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,)) return result[0]['table_exists'] > 0 except Exception as e: logger.error(f"检查表存在性失败: {e}") return False def check_table_columns(self) -> List[str]: """ 检查表的列 Returns: List[str]: 列名列表 """ try: result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,)) return [row['COLUMN_NAME'] for row in result] except Exception as e: logger.error(f"检查表列失败: {e}") return [] def add_missing_columns(self): """添加缺失的列""" try: logger.info("检查并添加缺失的列...") self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL) logger.info("表结构更新完成") except Exception as e: logger.error(f"更新表结构失败: {e}") def create_model_turbine_table(self) -> bool: """ 创建风机机型信息表 Returns: bool: 是否成功创建表 """ try: logger.info("开始创建风机机型信息表...") self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表创建成功") return True except Exception as e: logger.error(f"创建风机机型信息表失败: {e}") return False def drop_model_turbine_table(self) -> bool: """ 删除风机机型信息表 Returns: bool: 是否成功删除表 """ try: logger.info("开始删除风机机型信息表...") self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表删除成功") return True except Exception as e: logger.error(f"删除风机机型信息表失败: {e}") return False def get_model_data(self) -> List[Dict[str, Any]]: """ 从info_turbine表获取机型分组数据 Returns: 机型数据列表 """ try: logger.info("开始从info_turbine表查询机型数据...") data = self.execute_query(SELECT_MODEL_DATA_SQL) logger.info(f"成功查询到 {len(data)} 条机型记录") return data except Exception as e: logger.error(f"查询机型数据失败: {e}") return [] def get_no_model_list(self) -> List[str]: """ 获取所有no_model列表 Returns: no_model列表 """ try: logger.info("开始查询所有机型标识...") result = self.execute_query(SELECT_NO_MODEL_LIST_SQL) no_models = [row['no_model'] for row in result] logger.info(f"成功获取 {len(no_models)} 个机型标识") return no_models except Exception as e: logger.error(f"查询机型标识列表失败: {e}") return [] def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]: """ 获取指定机型的SCADA数据用于参数计算 Args: no_model: 机型标识 Returns: SCADA数据列表 """ try: logger.debug(f"开始查询机型 {no_model} 的SCADA数据...") data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,)) logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录") return data except Exception as e: logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}") return [] def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]: """ 计算并更新指定机型的额定参数 Args: no_model: 机型标识 Returns: 计算结果的字典 """ try: # 获取该机型的SCADA数据 scada_data = self.get_scada_data_for_no_model(no_model) if not scada_data: logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据") return { "no_model": no_model, "success": False, "reason": "无SCADA数据", "data_points": 0 } # 计算额定参数 rated_rotor_spd, rated_gen_spd, transmission_ratio = \ SCADADataProcessor.calculate_rated_speeds_and_ratio(scada_data) if rated_rotor_spd <= 0 or transmission_ratio <= 0: logger.warning(f"机型 {no_model}: 计算出的参数无效") return { "no_model": no_model, "success": False, "reason": "参数无效", "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "data_points": len(scada_data) } # 判断风机类型 turbine_type = SCADADataProcessor.detect_turbine_type(transmission_ratio, rated_gen_spd) # 更新数据库 update_result = self.execute_update( UPDATE_MODEL_PARAMETERS_SQL, ( rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type, datetime.now(), len(scada_data), no_model ) ) if update_result > 0: logger.info(f"机型 {no_model}: 成功更新参数 - " f"叶轮转速={rated_rotor_spd:.2f} rpm, " f"发电机转速={rated_gen_spd:.2f} rpm, " f"传动比={transmission_ratio:.2f}, " f"类型={turbine_type}, " f"数据点={len(scada_data)}") return { "no_model": no_model, "success": True, "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data) } else: logger.warning(f"机型 {no_model}: 数据库更新失败") return { "no_model": no_model, "success": False, "reason": "数据库更新失败", "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data) } except Exception as e: logger.error(f"计算机型 {no_model} 参数时出错: {e}") return { "no_model": no_model, "success": False, "reason": str(e), "data_points": 0 } def calculate_and_update_all_parameters(self) -> Dict[str, Any]: """ 计算并更新所有机型的额定参数 Returns: 计算统计信息 """ try: logger.info("开始计算并更新所有机型的额定参数...") # 获取所有机型标识 no_model_list = self.get_no_model_list() if not no_model_list: logger.warning("没有找到任何机型标识") return { "total_models": 0, "success_count": 0, "failed_count": 0, "results": [] } logger.info(f"共发现 {len(no_model_list)} 个机型需要计算") results = [] success_count = 0 failed_count = 0 # 遍历每个机型进行计算 for i, no_model in enumerate(no_model_list, 1): logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}") result = self.calculate_and_update_parameters_for_model(no_model) results.append(result) if result.get("success"): success_count += 1 else: failed_count += 1 # 汇总统计 stats = { "total_models": len(no_model_list), "success_count": success_count, "failed_count": failed_count, "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0, "results": results } logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, " f"成功率={stats['success_rate']:.1f}%") return stats except Exception as e: logger.error(f"计算并更新所有参数失败: {e}") raise def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int: """ 将机型数据插入到info_model_turbine表 Args: model_data: 机型数据列表 Returns: int: 成功插入的记录数 """ if not model_data: logger.warning("没有数据需要插入") return 0 insert_sql = """ INSERT INTO info_model_turbine (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, turbine_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE turbine_count = VALUES(turbine_count), updated_at = CURRENT_TIMESTAMP """ try: logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...") # 准备插入数据 insert_data = [] for item in model_data: insert_data.append(( item['no_model'], item['model'], item['manufacturer'], item['rated_capacity'], item['cut_in_wind_speed'], item['cut_out_wind_speed'], item['rotor_diameter'], item['hub_height'], item.get('turbine_count', 1) # 使用查询中的count值 )) # 批量插入数据 affected_rows = self.execute_batch_update(insert_sql, insert_data) logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表") return affected_rows except Exception as e: logger.error(f"插入机型数据失败: {e}") raise def get_model_turbine_stats(self) -> Dict[str, Any]: """ 获取info_model_turbine表的统计信息 Returns: 统计信息字典 """ try: # 查询统计信息 stats_sql = """ SELECT COUNT(*) as total_models, COUNT(DISTINCT manufacturer) as manufacturer_count, COUNT(DISTINCT model) as model_count, MIN(rated_capacity) as min_capacity, MAX(rated_capacity) as max_capacity, AVG(rated_capacity) as avg_capacity, SUM(turbine_count) as total_turbines, COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models, COUNT(DISTINCT turbine_type) as turbine_type_count, SUM(data_points) as total_data_points FROM info_model_turbine """ result = self.execute_query(stats_sql) return result[0] if result else {} except Exception as e: logger.error(f"获取统计信息失败: {e}") return {} def get_turbine_type_distribution(self) -> List[Dict[str, Any]]: """ 获取风机类型分布 Returns: 类型分布列表 """ try: type_sql = """ SELECT IFNULL(turbine_type, '未知') as turbine_type, COUNT(*) as model_count, SUM(turbine_count) as turbine_count, AVG(transmission_ratio) as avg_ratio, AVG(rated_rotor_spd) as avg_rotor_spd, AVG(rated_gen_spd) as avg_gen_spd FROM info_model_turbine WHERE turbine_type IS NOT NULL GROUP BY turbine_type ORDER BY model_count DESC """ return self.execute_query(type_sql) except Exception as e: logger.error(f"获取风机类型分布失败: {e}") return [] def print_model_summary(self, model_data: List[Dict[str, Any]]): """ 打印机型数据摘要 Args: model_data: 机型数据列表 """ if not model_data: logger.info("没有机型数据") return print("\n" + "="*80) print("风机机型数据摘要") print("="*80) print(f"总机型数: {len(model_data)}") # 按制造商统计 manufacturers = {} for item in model_data: manufacturer = item.get('manufacturer', '未知') manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1 print(f"制造商数: {len(manufacturers)}") print("\n制造商分布:") for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True): print(f" {manufacturer}: {count} 种机型") # 按额定容量统计 capacities = {} for item in model_data: capacity = item.get('rated_capacity', 0) if capacity: capacity_range = f"{capacity}kW" capacities[capacity_range] = capacities.get(capacity_range, 0) + 1 print("\n额定容量分布:") for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))): print(f" {capacity}: {count} 种机型") print("="*80) def print_calculation_summary(self, stats: Dict[str, Any]): """打印参数计算摘要""" print("\n" + "="*80) print("参数计算统计") print("="*80) print(f"总机型数: {stats.get('total_models', 0)}") print(f"成功计算的机型数: {stats.get('success_count', 0)}") print(f"失败的机型数: {stats.get('failed_count', 0)}") print(f"成功率: {stats.get('success_rate', 0):.1f}%") # 显示成功计算的部分结果 success_results = [r for r in stats.get('results', []) if r.get('success')] if success_results: print(f"\n成功计算的机型示例 (前5个):") for i, result in enumerate(success_results[:5]): print(f" {i+1}. {result['no_model']}:") print(f" 叶轮转速: {result['rated_rotor_spd']:.2f} rpm") print(f" 发电机转速: {result['rated_gen_spd']:.2f} rpm") print(f" 传动比: {result['transmission_ratio']:.2f}") print(f" 类型: {result['turbine_type']}") print(f" 数据点数: {result['data_points']}") # 显示失败的部分原因 failed_results = [r for r in stats.get('results', []) if not r.get('success')] if failed_results: print(f"\n失败的机型示例 (前5个):") for i, result in enumerate(failed_results[:5]): print(f" {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}") print("="*80) def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool: """ 运行完整的机型数据提取和参数计算流程 Args: recreate_table: 是否重新创建表 calculate_params: 是否计算额定参数 Returns: bool: 整个流程是否成功 """ try: logger.info("开始执行风机机型数据提取和参数计算流程...") # 步骤1: 检查或创建表 if recreate_table: self.drop_model_turbine_table() self.create_model_turbine_table() else: if not self.check_table_exists(): self.create_model_turbine_table() else: logger.info("info_model_turbine表已存在,将追加数据") # 检查并添加缺失的列 self.add_missing_columns() # 步骤2: 从info_turbine表获取机型数据 model_data = self.get_model_data() if not model_data: logger.error("未获取到机型数据,流程终止") return False # 步骤3: 打印摘要信息 self.print_model_summary(model_data) # 步骤4: 插入数据到info_model_turbine表 inserted_count = self.insert_model_data(model_data) # 步骤5: 计算额定参数 if calculate_params: calculation_stats = self.calculate_and_update_all_parameters() self.print_calculation_summary(calculation_stats) # 步骤6: 获取并显示最终统计信息 stats = self.get_model_turbine_stats() if stats: print("\n数据库统计信息:") print(f" 总机型数: {stats.get('total_models', 0)}") print(f" 制造商数: {stats.get('manufacturer_count', 0)}") print(f" 总风机数: {stats.get('total_turbines', 0)}") print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW") print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW") print(f" 已计算参数的机型数: {stats.get('calculated_models', 0)}") print(f" 使用的总数据点数: {stats.get('total_data_points', 0)}") # 显示风机类型分布 type_dist = self.get_turbine_type_distribution() if type_dist: print("\n风机类型分布:") for item in type_dist: print(f" {item['turbine_type']}: {item['model_count']} 种机型, " f"{item['turbine_count']} 台风机") if item['avg_ratio']: print(f" 平均传动比: {item['avg_ratio']:.2f}, " f"平均叶轮转速: {item['avg_rotor_spd']:.2f} rpm, " f"平均发电机转速: {item['avg_gen_spd']:.2f} rpm") logger.info("风机机型数据提取和参数计算流程执行完成!") return True except Exception as e: logger.error(f"流程执行失败: {e}") import traceback traceback.print_exc() return False finally: # 关闭连接池 if self.connection_pool: self.connection_pool.close_all() def main(): """主函数""" # 数据库配置 db_config = DatabaseConfig( host="106.120.102.238", port=44000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30 # 4GB ) # 创建管理器实例 manager = ModelTurbineManager(db_config) # 运行机型数据提取和参数计算流程 # recreate_table=True 会删除并重新创建表 # calculate_params=True 会计算额定参数 success = manager.run_model_extraction_pipeline( recreate_table=False, calculate_params=True ) if success: logger.info("风机机型数据提取和参数计算成功完成!") else: logger.error("风机机型数据提取和参数计算失败!") if __name__ == "__main__": main()