import pymysql import threading from typing import List, Dict, Any, Optional, Tuple import logging from datetime import datetime from collections import Counter import statistics import math # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ConnectionPool: """MySQL数据库连接池""" def __init__(self, host, port, user, password, database, charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota self._lock = threading.Lock() self._connections = [] self._in_use = set() def _create_connection(self): """创建新连接并设置内存配额""" try: conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset=self.charset, cursorclass=pymysql.cursors.DictCursor ) # 设置会话内存配额 with conn.cursor() as cursor: cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}") conn.commit() logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}") return conn except Exception as e: logger.error(f"创建数据库连接失败: {e}") raise def get_connection(self): """从连接池获取连接""" with self._lock: # 如果有空闲连接,返回一个 for conn in self._connections: if conn not in self._in_use: self._in_use.add(conn) logger.debug(f"从连接池获取现有连接") return conn # 如果没有空闲连接但可以创建新连接 if len(self._connections) < self.max_connections: conn = self._create_connection() self._connections.append(conn) self._in_use.add(conn) logger.debug(f"创建新连接,当前连接数: {len(self._connections)}") return conn # 连接池已满,等待 logger.warning("连接池已满,等待可用连接...") import time time.sleep(1) return self.get_connection() def release_connection(self, conn): """释放连接回连接池""" with self._lock: if conn in self._in_use: self._in_use.remove(conn) logger.debug(f"释放连接回连接池") def close_all(self): """关闭所有连接""" with self._lock: for conn in self._connections: try: conn.close() except: pass self._connections.clear() self._in_use.clear() logger.info("已关闭所有数据库连接") class DatabaseConfig: """数据库配置类""" def __init__(self, host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota class ManufacturerInfo: """主机厂商信息库,用于合理性校验""" # 各厂商典型传动比范围(基于公开数据) TRANSMISSION_RATIOS = { # 直驱风机:传动比接近1 "金风科技": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "远景能源": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "明阳智能": {"type": "半直驱", "ratio_range": (30.0, 50.0)}, "运达股份": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "上海电气": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "东方电气": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "华锐风电": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "联合动力": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "海装风电": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "Vestas": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "GE": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "Siemens": {"type": "直驱", "ratio_range": (0.95, 1.05)}, } # 各容量等级典型叶轮转速范围(rpm) ROTOR_SPEED_RANGES = { 1500: (15.0, 20.0), # 1.5MW 2000: (14.0, 18.0), # 2.0MW 2500: (13.0, 17.0), # 2.5MW 3000: (12.0, 16.0), # 3.0MW 3500: (11.0, 15.0), # 3.5MW 4000: (10.0, 14.0), # 4.0MW 5000: (9.0, 13.0), # 5.0MW 6000: (8.0, 12.0), # 6.0MW } # 各类型风机典型发电机转速范围(rpm) GENERATOR_SPEED_RANGES = { "直驱": (10.0, 30.0), # 直驱:低速发电机 "双馈": (1000.0, 1800.0), # 双馈:接近同步转速 "半直驱": (300.0, 800.0), # 半直驱:中速发电机 } @classmethod def get_manufacturer_info(cls, manufacturer: str) -> Dict: """获取厂商信息""" manufacturer_lower = str(manufacturer).strip().lower() if manufacturer else "" for key, value in cls.TRANSMISSION_RATIOS.items(): if key.lower() in manufacturer_lower: return value return {"type": "未知", "ratio_range": (0.5, 200.0)} @classmethod def get_rotor_speed_range(cls, rated_capacity: int) -> Tuple[float, float]: """根据额定容量获取合理的叶轮转速范围""" if not rated_capacity: return (8.0, 25.0) # 找到最接近的容量等级 capacities = sorted(cls.ROTOR_SPEED_RANGES.keys()) closest_capacity = min(capacities, key=lambda x: abs(x - rated_capacity)) return cls.ROTOR_SPEED_RANGES.get(closest_capacity, (8.0, 25.0)) @classmethod def get_generator_speed_range(cls, turbine_type: str) -> Tuple[float, float]: """根据风机类型获取合理的发电机转速范围""" return cls.GENERATOR_SPEED_RANGES.get(turbine_type, (0.0, 2000.0)) class SCADADataProcessor: """SCADA数据处理类,用于计算额定转速和传动比""" @staticmethod def calculate_robust_mode(values: List[float], decimal_places: int = 1) -> float: """ 计算稳健的众数(使用核密度估计) Args: values: 数值列表 decimal_places: 保留的小数位数 Returns: 众数值 """ if not values: return 0.0 # 1. 数据清洗:移除异常值(使用IQR方法) if len(values) >= 10: q1, q3 = np.percentile(values, [25, 75]) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr filtered_values = [v for v in values if lower_bound <= v <= upper_bound] if filtered_values: values = filtered_values # 2. 对值进行四舍五入处理 rounded_values = [round(v, decimal_places) for v in values] # 3. 使用Counter统计频率 counter = Counter(rounded_values) if not counter: return 0.0 # 4. 找到最高频率 max_count = max(counter.values()) # 5. 获取所有具有最高频率的值 modes = [value for value, count in counter.items() if count == max_count] # 6. 如果有多个众数,返回平均值 if len(modes) > 1: mode_value = sum(modes) / len(modes) logger.debug(f"多个众数: {modes}, 使用平均值: {mode_value}") else: mode_value = modes[0] logger.debug(f"众数统计: 值={mode_value}, 频次={max_count}, 总数据点={len(values)}") return mode_value @staticmethod def calculate_rated_parameters_with_validation( rotor_speeds: List[float], gen_speeds: List[float], manufacturer: str, rated_capacity: int ) -> Tuple[float, float, float, str]: """ 计算额定参数并进行合理性校验 Args: rotor_speeds: 叶轮转速列表 gen_speeds: 发电机转速列表 manufacturer: 制造商 rated_capacity: 额定容量(kW) Returns: Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type) """ if not rotor_speeds or not gen_speeds: logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}") return 0.0, 0.0, 0.0, "未知" # 获取厂商信息 manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer) expected_type = manu_info["type"] expected_ratio_range = manu_info["ratio_range"] # 获取合理的转速范围 rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity) gen_range = ManufacturerInfo.get_generator_speed_range(expected_type) logger.debug(f"厂商: {manufacturer}, 预期类型: {expected_type}, " f"传动比范围: {expected_ratio_range}, " f"叶轮转速范围: {rotor_range}, 发电机转速范围: {gen_range}") # 第一阶段:初始计算 # 叶轮转速:保留1位小数 rated_rotor_spd = round(SCADADataProcessor.calculate_robust_mode(rotor_speeds, 1), 1) # 发电机转速:先计算众数,然后取整 gen_mode = SCADADataProcessor.calculate_robust_mode(gen_speeds, 1) rated_gen_spd = int(round(gen_mode)) # 计算传动比:保留3位小数 if rated_rotor_spd > 0: transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3) else: transmission_ratio = 0.0 # 第二阶段:合理性校验与调整 adjustments = [] # 1. 校验叶轮转速 if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]): adjustments.append(f"叶轮转速超出范围: {rated_rotor_spd:.1f} rpm, 合理范围: {rotor_range}") # 使用中位数并四舍五入到1位小数 rated_rotor_spd = round(float(statistics.median(rotor_speeds)), 1) rated_rotor_spd = max(rotor_range[0], min(rotor_range[1], rated_rotor_spd)) # 2. 校验发电机转速 if not (gen_range[0] <= rated_gen_spd <= gen_range[1]): adjustments.append(f"发电机转速超出范围: {rated_gen_spd} rpm, 合理范围: {gen_range}") # 使用中位数并取整 gen_median = float(statistics.median(gen_speeds)) rated_gen_spd = int(round(gen_median)) rated_gen_spd = int(max(gen_range[0], min(gen_range[1], rated_gen_spd))) # 3. 重新计算传动比 if rated_rotor_spd > 0: transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3) else: transmission_ratio = 0.0 # 4. 校验传动比 if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]): adjustments.append(f"传动比超出范围: {transmission_ratio:.3f}, 合理范围: {expected_ratio_range}") # 如果传动比异常,基于预期类型调整 if expected_type == "直驱": # 直驱:传动比应接近1 transmission_ratio = 1.000 rated_gen_spd = int(round(rated_rotor_spd)) elif expected_type == "双馈": # 双馈:典型传动比90:1 transmission_ratio = 90.000 rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio)) elif expected_type == "半直驱": # 半直驱:典型传动比40:1 transmission_ratio = 40.000 rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio)) # 第三阶段:最终校验与类型判断 # 基于最终传动比判断类型 if transmission_ratio <= 1.2: turbine_type = "直驱" elif 1.2 < transmission_ratio <= 30: turbine_type = "半直驱" elif 30 < transmission_ratio <= 120: turbine_type = "双馈" else: turbine_type = "未知" # 如果计算的类型与预期不符,使用预期类型 if expected_type != "未知" and turbine_type != expected_type: adjustments.append(f"类型不符: 计算={turbine_type}, 预期={expected_type}") turbine_type = expected_type # 记录调整信息 if adjustments: logger.info(f"参数调整: {manufacturer} {rated_capacity}kW - " + "; ".join(adjustments)) # 最终格式化 # 叶轮转速:保留1位小数 rated_rotor_spd = round(rated_rotor_spd, 1) # 发电机转速:整数 rated_gen_spd = int(rated_gen_spd) # 传动比:保留3位小数 transmission_ratio = round(transmission_ratio, 3) logger.debug(f"最终结果: 转子转速={rated_rotor_spd:.1f} rpm, " f"发电机转速={rated_gen_spd} rpm, " f"传动比={transmission_ratio:.3f}, " f"类型={turbine_type}") return rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type @staticmethod def calculate_rated_speeds_and_ratio( data: List[Dict[str, Any]], manufacturer: str = "", rated_capacity: int = 0 ) -> Tuple[float, float, float, str]: """ 计算额定叶轮转速、额定发电机转速、传动比和风机类型 Args: data: SCADA数据列表 manufacturer: 制造商(用于合理性校验) rated_capacity: 额定容量(用于合理性校验) Returns: Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type) """ if not data: return 0.0, 0.0, 0.0, "未知" # 提取数据 rotor_speeds = [] gen_speeds = [] for record in data: rotor_spd = record.get('rotor_spd') gen_spd = record.get('gen_spd') if rotor_spd is not None and gen_spd is not None: try: rotor_val = float(rotor_spd) gen_val = float(gen_spd) # 基本数据清洗 if 0 < rotor_val < 50 and 0 < gen_val < 2500: rotor_speeds.append(rotor_val) gen_speeds.append(gen_val) except (ValueError, TypeError): continue if not rotor_speeds or not gen_speeds: logger.warning(f"有效数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}") return 0.0, 0.0, 0.0, "未知" # 计算额定参数 return SCADADataProcessor.calculate_rated_parameters_with_validation( rotor_speeds, gen_speeds, manufacturer, rated_capacity ) # SQL语句定义(保持不变) CREATE_MODEL_TURBINE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS info_model_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识', model VARCHAR(100) COMMENT '机型', manufacturer VARCHAR(100) COMMENT '制造商', rated_capacity INT COMMENT '额定容量(kW)', cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)', cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)', rotor_diameter INT COMMENT '叶轮直径(m)', hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)', turbine_count INT DEFAULT 0 COMMENT '该机型风机数量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY idx_no_model (no_model), INDEX idx_model (model), INDEX idx_manufacturer (manufacturer), INDEX idx_rated_capacity (rated_capacity) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表'; """ ALTER_MODEL_TURBINE_TABLE_SQL = """ ALTER TABLE info_model_turbine ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 1) COMMENT '额定叶轮转速(rpm)', ADD COLUMN IF NOT EXISTS rated_gen_spd INT COMMENT '额定发电机转速(rpm)', ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 3) COMMENT '传动比', ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)', ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间', ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数', ADD COLUMN IF NOT EXISTS calculation_status VARCHAR(20) DEFAULT 'pending' COMMENT '计算状态(pending/success/error/adjusted)', ADD COLUMN IF NOT EXISTS validation_info TEXT COMMENT '合理性校验信息'; """ UPDATE_MODEL_PARAMETERS_SQL = """ UPDATE info_model_turbine SET rated_rotor_spd = %s, rated_gen_spd = %s, transmission_ratio = %s, turbine_type = %s, calculation_time = %s, data_points = %s, calculation_status = %s, validation_info = %s, updated_at = CURRENT_TIMESTAMP WHERE no_model = %s """ class ModelTurbineManager: """风机机型信息管理器""" def __init__(self, db_config: DatabaseConfig): self.db_config = db_config self.connection_pool = None self._initialize_connection_pool() def _initialize_connection_pool(self): """初始化数据库连接池""" self.connection_pool = ConnectionPool( host=self.db_config.host, port=self.db_config.port, user=self.db_config.user, password=self.db_config.password, database=self.db_config.database, charset=self.db_config.charset, max_connections=self.db_config.max_connections, mem_quota=self.db_config.mem_quota ) logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}") # ... 其他方法保持不变 ... def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]: """ 计算并更新指定机型的额定参数 Args: no_model: 机型标识 Returns: 计算结果的字典 """ try: # 先获取机型基本信息 model_info_sql = """ SELECT manufacturer, rated_capacity FROM info_model_turbine WHERE no_model = %s """ model_info = self.execute_query(model_info_sql, (no_model,)) if not model_info: logger.warning(f"机型 {no_model}: 未找到基本信息") return { "no_model": no_model, "success": False, "reason": "未找到机型基本信息", "data_points": 0 } manufacturer = model_info[0].get('manufacturer', '') rated_capacity = model_info[0].get('rated_capacity', 0) # 获取该机型的SCADA数据 scada_data = self.get_scada_data_for_no_model(no_model) if not scada_data: logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据") return { "no_model": no_model, "success": False, "reason": "无SCADA数据", "data_points": 0 } # 计算额定参数(带合理性校验) rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type = \ SCADADataProcessor.calculate_rated_speeds_and_ratio( scada_data, manufacturer, rated_capacity ) # 判断计算状态 if rated_rotor_spd <= 0 or transmission_ratio <= 0: calculation_status = "error" validation_info = "参数无效" success = False else: # 检查是否需要调整 manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer) expected_ratio_range = manu_info["ratio_range"] rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity) gen_range = ManufacturerInfo.get_generator_speed_range(turbine_type) # 判断是否经过调整 adjustments = [] if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]): adjustments.append(f"叶轮转速调整: {rated_rotor_spd:.1f} rpm") if not (gen_range[0] <= rated_gen_spd <= gen_range[1]): adjustments.append(f"发电机转速调整: {rated_gen_spd} rpm") if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]): adjustments.append(f"传动比调整: {transmission_ratio:.3f}") if adjustments: calculation_status = "adjusted" validation_info = "; ".join(adjustments) else: calculation_status = "success" validation_info = "参数合理" success = True # 更新数据库 update_result = self.execute_update( UPDATE_MODEL_PARAMETERS_SQL, ( rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type, datetime.now(), len(scada_data), calculation_status, validation_info, no_model ) ) if update_result > 0: log_msg = (f"机型 {no_model}: {calculation_status} - " f"叶轮转速={rated_rotor_spd:.1f} rpm, " f"发电机转速={rated_gen_spd} rpm, " f"传动比={transmission_ratio:.3f}, " f"类型={turbine_type}, " f"数据点={len(scada_data)}") if calculation_status == "adjusted": logger.info(log_msg + f", 调整: {validation_info}") else: logger.info(log_msg) return { "no_model": no_model, "success": success, "status": calculation_status, "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data), "validation_info": validation_info } else: logger.warning(f"机型 {no_model}: 数据库更新失败") return { "no_model": no_model, "success": False, "reason": "数据库更新失败", "status": "error", "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data) } except Exception as e: logger.error(f"计算机型 {no_model} 参数时出错: {e}") return { "no_model": no_model, "success": False, "reason": str(e), "status": "error", "data_points": 0 } # ... 其他方法保持不变 ... def main(): """主函数""" # 数据库配置 db_config = DatabaseConfig( # host="106.120.102.238", # port=44000, host="192.168.50.234", port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30 # 4GB ) # 创建管理器实例 manager = ModelTurbineManager(db_config) # 运行机型数据提取和参数计算流程 success = manager.run_model_extraction_pipeline( recreate_table=False, # 是否重新创建表 calculate_params=True # 是否计算额定参数 ) if success: logger.info("风机机型数据提取和参数计算成功完成!") else: logger.error("风机机型数据提取和参数计算失败!") if __name__ == "__main__": # 导入numpy用于统计计算 try: import numpy as np except ImportError: logger.warning("未找到numpy库,使用简单统计方法") # 定义简单的替代函数 def percentile(data, percentiles): """简单的百分位数计算""" if not data: return [0, 0] sorted_data = sorted(data) n = len(sorted_data) return [ sorted_data[int((n-1) * p / 100)] for p in percentiles ] # 临时创建numpy模块 class NumpyStub: @staticmethod def percentile(data, percentiles): return percentile(data, percentiles) np = NumpyStub() main()