import pymysql import threading from typing import List, Dict, Any, Optional, Tuple import logging from datetime import datetime from collections import Counter import statistics # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ConnectionPool: """MySQL数据库连接池""" def __init__(self, host, port, user, password, database, charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota self._lock = threading.Lock() self._connections = [] self._in_use = set() def _create_connection(self): """创建新连接并设置内存配额""" try: conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset=self.charset, cursorclass=pymysql.cursors.DictCursor ) # 设置会话内存配额 with conn.cursor() as cursor: cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}") conn.commit() logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}") return conn except Exception as e: logger.error(f"创建数据库连接失败: {e}") raise def get_connection(self): """从连接池获取连接""" with self._lock: # 如果有空闲连接,返回一个 for conn in self._connections: if conn not in self._in_use: self._in_use.add(conn) logger.debug(f"从连接池获取现有连接") return conn # 如果没有空闲连接但可以创建新连接 if len(self._connections) < self.max_connections: conn = self._create_connection() self._connections.append(conn) self._in_use.add(conn) logger.debug(f"创建新连接,当前连接数: {len(self._connections)}") return conn # 连接池已满,等待 logger.warning("连接池已满,等待可用连接...") import time time.sleep(1) return self.get_connection() def release_connection(self, conn): """释放连接回连接池""" with self._lock: if conn in self._in_use: self._in_use.remove(conn) logger.debug(f"释放连接回连接池") def close_all(self): """关闭所有连接""" with self._lock: for conn in self._connections: try: conn.close() except: pass self._connections.clear() self._in_use.clear() logger.info("已关闭所有数据库连接") class DatabaseConfig: """数据库配置类""" def __init__(self, host='192.168.50.234', port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30): self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.max_connections = max_connections self.mem_quota = mem_quota class ManufacturerInfo: """主机厂商信息库,用于合理性校验""" # 各厂商典型传动比范围(基于公开数据) TRANSMISSION_RATIOS = { # 直驱风机:传动比接近1 "金风科技": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "远景能源": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "明阳智能": {"type": "半直驱", "ratio_range": (30.0, 50.0)}, "运达股份": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "上海电气": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "东方电气": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "华锐风电": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "联合动力": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "海装风电": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "Vestas": {"type": "双馈", "ratio_range": (70.0, 100.0)}, "GE": {"type": "直驱", "ratio_range": (0.95, 1.05)}, "Siemens": {"type": "直驱", "ratio_range": (0.95, 1.05)}, } # 各容量等级典型叶轮转速范围(rpm) ROTOR_SPEED_RANGES = { 1500: (15.0, 20.0), # 1.5MW 2000: (14.0, 18.0), # 2.0MW 2500: (13.0, 17.0), # 2.5MW 3000: (12.0, 16.0), # 3.0MW 3500: (11.0, 15.0), # 3.5MW 4000: (10.0, 14.0), # 4.0MW 5000: (9.0, 13.0), # 5.0MW 6000: (8.0, 12.0), # 6.0MW } # 各类型风机典型发电机转速范围(rpm) GENERATOR_SPEED_RANGES = { "直驱": (10.0, 30.0), # 直驱:低速发电机 "双馈": (1000.0, 1800.0), # 双馈:接近同步转速 "半直驱": (300.0, 800.0), # 半直驱:中速发电机 } @classmethod def get_manufacturer_info(cls, manufacturer: str) -> Dict: """获取厂商信息""" manufacturer_lower = str(manufacturer).strip().lower() if manufacturer else "" for key, value in cls.TRANSMISSION_RATIOS.items(): if key.lower() in manufacturer_lower: return value return {"type": "未知", "ratio_range": (0.5, 200.0)} @classmethod def get_rotor_speed_range(cls, rated_capacity: int) -> Tuple[float, float]: """根据额定容量获取合理的叶轮转速范围""" if not rated_capacity: return (8.0, 25.0) # 找到最接近的容量等级 capacities = sorted(cls.ROTOR_SPEED_RANGES.keys()) closest_capacity = min(capacities, key=lambda x: abs(x - rated_capacity)) return cls.ROTOR_SPEED_RANGES.get(closest_capacity, (8.0, 25.0)) @classmethod def get_generator_speed_range(cls, turbine_type: str) -> Tuple[float, float]: """根据风机类型获取合理的发电机转速范围""" return cls.GENERATOR_SPEED_RANGES.get(turbine_type, (0.0, 2000.0)) class SCADADataProcessor: """SCADA数据处理类,用于计算额定转速和传动比""" @staticmethod def calculate_robust_mode(values: List[float], decimal_places: int = 1) -> float: """ 计算稳健的众数(使用核密度估计) Args: values: 数值列表 decimal_places: 保留的小数位数 Returns: 众数值 """ if not values: return 0.0 # 1. 数据清洗:移除异常值(使用IQR方法) if len(values) >= 10: q1, q3 = np.percentile(values, [25, 75]) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr filtered_values = [v for v in values if lower_bound <= v <= upper_bound] if filtered_values: values = filtered_values # 2. 对值进行四舍五入处理 rounded_values = [round(v, decimal_places) for v in values] # 3. 使用Counter统计频率 counter = Counter(rounded_values) if not counter: return 0.0 # 4. 找到最高频率 max_count = max(counter.values()) # 5. 获取所有具有最高频率的值 modes = [value for value, count in counter.items() if count == max_count] # 6. 如果有多个众数,返回平均值 if len(modes) > 1: mode_value = sum(modes) / len(modes) logger.debug(f"多个众数: {modes}, 使用平均值: {mode_value}") else: mode_value = modes[0] logger.debug(f"众数统计: 值={mode_value}, 频次={max_count}, 总数据点={len(values)}") return mode_value @staticmethod def calculate_rated_parameters_with_validation( rotor_speeds: List[float], gen_speeds: List[float], manufacturer: str, rated_capacity: int ) -> Tuple[float, float, float, str]: """ 计算额定参数并进行合理性校验 Args: rotor_speeds: 叶轮转速列表 gen_speeds: 发电机转速列表 manufacturer: 制造商 rated_capacity: 额定容量(kW) Returns: Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type) """ if not rotor_speeds or not gen_speeds: logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}") return 0.0, 0.0, 0.0, "未知" # 获取厂商信息 manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer) expected_type = manu_info["type"] expected_ratio_range = manu_info["ratio_range"] # 获取合理的转速范围 rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity) gen_range = ManufacturerInfo.get_generator_speed_range(expected_type) logger.debug(f"厂商: {manufacturer}, 预期类型: {expected_type}, " f"传动比范围: {expected_ratio_range}, " f"叶轮转速范围: {rotor_range}, 发电机转速范围: {gen_range}") # 第一阶段:初始计算 # 叶轮转速:保留1位小数 rated_rotor_spd = round(SCADADataProcessor.calculate_robust_mode(rotor_speeds, 1), 1) # 发电机转速:先计算众数,然后取整 gen_mode = SCADADataProcessor.calculate_robust_mode(gen_speeds, 1) rated_gen_spd = int(round(gen_mode)) # 计算传动比:保留3位小数 if rated_rotor_spd > 0: transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3) else: transmission_ratio = 0.0 # 第二阶段:合理性校验与调整 adjustments = [] # 1. 校验叶轮转速 if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]): adjustments.append(f"叶轮转速超出范围: {rated_rotor_spd:.1f} rpm, 合理范围: {rotor_range}") # 使用中位数并四舍五入到1位小数 rated_rotor_spd = round(float(statistics.median(rotor_speeds)), 1) rated_rotor_spd = max(rotor_range[0], min(rotor_range[1], rated_rotor_spd)) # 2. 校验发电机转速 if not (gen_range[0] <= rated_gen_spd <= gen_range[1]): adjustments.append(f"发电机转速超出范围: {rated_gen_spd} rpm, 合理范围: {gen_range}") # 使用中位数并取整 gen_median = float(statistics.median(gen_speeds)) rated_gen_spd = int(round(gen_median)) rated_gen_spd = int(max(gen_range[0], min(gen_range[1], rated_gen_spd))) # 3. 重新计算传动比 if rated_rotor_spd > 0: transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3) else: transmission_ratio = 0.0 # 4. 校验传动比 if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]): adjustments.append(f"传动比超出范围: {transmission_ratio:.3f}, 合理范围: {expected_ratio_range}") # 如果传动比异常,基于预期类型调整 if expected_type == "直驱": # 直驱:传动比应接近1 transmission_ratio = 1.000 rated_gen_spd = int(round(rated_rotor_spd)) elif expected_type == "双馈": # 双馈:典型传动比90:1 transmission_ratio = 90.000 rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio)) elif expected_type == "半直驱": # 半直驱:典型传动比40:1 transmission_ratio = 40.000 rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio)) # 第三阶段:最终校验与类型判断 # 基于最终传动比判断类型 if transmission_ratio <= 1.2: turbine_type = "直驱" elif 1.2 < transmission_ratio <= 30: turbine_type = "半直驱" elif 30 < transmission_ratio <= 120: turbine_type = "双馈" else: turbine_type = "未知" # 如果计算的类型与预期不符,使用预期类型 if expected_type != "未知" and turbine_type != expected_type: adjustments.append(f"类型不符: 计算={turbine_type}, 预期={expected_type}") turbine_type = expected_type # 第四阶段:特殊处理 - 直驱机型传动比强制设置为1.000 if turbine_type == "直驱": # 直驱机型传动比强制为1.000 transmission_ratio = 1.000 # 同时调整发电机转速等于叶轮转速(取整) rated_gen_spd = int(round(rated_rotor_spd)) # 记录调整信息 adjustments.append(f"直驱机型传动比强制设置为1.000") # 记录调整信息 if adjustments: logger.info(f"参数调整: {manufacturer} {rated_capacity}kW - " + "; ".join(adjustments)) # 最终格式化 # 叶轮转速:保留1位小数 rated_rotor_spd = round(rated_rotor_spd, 1) # 发电机转速:整数 rated_gen_spd = int(rated_gen_spd) # 传动比:保留3位小数 transmission_ratio = round(transmission_ratio, 3) logger.debug(f"最终结果: 转子转速={rated_rotor_spd:.1f} rpm, " f"发电机转速={rated_gen_spd} rpm, " f"传动比={transmission_ratio:.3f}, " f"类型={turbine_type}") return rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type @staticmethod def calculate_rated_speeds_and_ratio( data: List[Dict[str, Any]], manufacturer: str = "", rated_capacity: int = 0 ) -> Tuple[float, float, float, str]: """ 计算额定叶轮转速、额定发电机转速、传动比和风机类型 Args: data: SCADA数据列表 manufacturer: 制造商(用于合理性校验) rated_capacity: 额定容量(用于合理性校验) Returns: Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type) """ if not data: return 0.0, 0.0, 0.0, "未知" # 提取数据 rotor_speeds = [] gen_speeds = [] for record in data: rotor_spd = record.get('rotor_spd') gen_spd = record.get('gen_spd') if rotor_spd is not None and gen_spd is not None: try: rotor_val = float(rotor_spd) gen_val = float(gen_spd) # 基本数据清洗 if 0 < rotor_val < 50 and 0 < gen_val < 2500: rotor_speeds.append(rotor_val) gen_speeds.append(gen_val) except (ValueError, TypeError): continue if not rotor_speeds or not gen_speeds: logger.warning(f"有效数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}") return 0.0, 0.0, 0.0, "未知" # 计算额定参数 return SCADADataProcessor.calculate_rated_parameters_with_validation( rotor_speeds, gen_speeds, manufacturer, rated_capacity ) # SQL语句定义 CREATE_MODEL_TURBINE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS info_model_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识', model VARCHAR(100) COMMENT '机型', manufacturer VARCHAR(100) COMMENT '制造商', rated_capacity INT COMMENT '额定容量(kW)', cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)', cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)', rotor_diameter INT COMMENT '叶轮直径(m)', hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)', turbine_count INT DEFAULT 0 COMMENT '该机型风机数量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY idx_no_model (no_model), INDEX idx_model (model), INDEX idx_manufacturer (manufacturer), INDEX idx_rated_capacity (rated_capacity) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表'; """ ALTER_MODEL_TURBINE_TABLE_SQL = """ ALTER TABLE info_model_turbine ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 1) COMMENT '额定叶轮转速(rpm)', ADD COLUMN IF NOT EXISTS rated_gen_spd INT COMMENT '额定发电机转速(rpm)', ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 3) COMMENT '传动比', ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)', ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间', ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数', ADD COLUMN IF NOT EXISTS calculation_status VARCHAR(20) DEFAULT 'pending' COMMENT '计算状态(pending/success/error/adjusted)', ADD COLUMN IF NOT EXISTS validation_info TEXT COMMENT '合理性校验信息'; """ UPDATE_MODEL_PARAMETERS_SQL = """ UPDATE info_model_turbine SET rated_rotor_spd = %s, rated_gen_spd = %s, transmission_ratio = %s, turbine_type = %s, calculation_time = %s, data_points = %s, calculation_status = %s, validation_info = %s, updated_at = CURRENT_TIMESTAMP WHERE no_model = %s """ # 其他SQL语句 SELECT_MODEL_DATA_SQL = """ SELECT CONCAT( IFNULL(model, ''), '-', IFNULL(cut_in_wind_speed, ''), '-', IFNULL(cut_out_wind_speed, ''), '-', IFNULL(hub_height, '') ) AS no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, COUNT(*) AS turbine_count FROM info_turbine WHERE model IS NOT NULL GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height ORDER BY model, manufacturer, rated_capacity; """ SELECT_NO_MODEL_LIST_SQL = """ SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model; """ SELECT_SCADA_FOR_NO_MODEL_SQL = """ SELECT it.wind_farm_id, it.turbine_id, it.model, icpt.rated_wind_speed, imt.no_model, imt.rated_capacity, imt.cut_in_wind_speed, imt.cut_out_wind_speed, imt.rotor_diameter, imt.hub_height, dst.data_time, dst.wind_spd, dst.rotor_spd, dst.gen_spd, dst.p_active FROM info_turbine it, info_curve_power_turbine icpt, info_model_turbine imt, data_scada_turbine dst WHERE 1=1 AND it.wind_farm_id = dst.id_farm AND it.turbine_id = dst.id_turbine AND it.model = dst.no_model_turbine AND it.model = imt.model AND it.cut_in_wind_speed = imt.cut_in_wind_speed AND it.cut_out_wind_speed = imt.cut_out_wind_speed AND it.rotor_diameter = imt.rotor_diameter AND it.hub_height = imt.hub_height AND it.wind_farm_id = icpt.wind_farm_id AND it.model = icpt.standard_model AND imt.no_model = %s AND dst.wind_spd >= icpt.rated_wind_speed AND dst.p_active >= imt.rated_capacity * 0.95 AND dst.p_active <= imt.rated_capacity * 1.05 AND dst.rotor_spd IS NOT NULL AND dst.gen_spd IS NOT NULL AND dst.rotor_spd > 0 AND dst.gen_spd > 0 ORDER BY dst.data_time; """ DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine" CHECK_TABLE_EXISTS_SQL = """ SELECT COUNT(*) as table_exists FROM information_schema.tables WHERE table_schema = %s AND table_name = 'info_model_turbine' """ CHECK_TABLE_COLUMNS_SQL = """ SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine' """ class ModelTurbineManager: """风机机型信息管理器""" def __init__(self, db_config: DatabaseConfig): self.db_config = db_config self.connection_pool = None self._initialize_connection_pool() def _initialize_connection_pool(self): """初始化数据库连接池""" self.connection_pool = ConnectionPool( host=self.db_config.host, port=self.db_config.port, user=self.db_config.user, password=self.db_config.password, database=self.db_config.database, charset=self.db_config.charset, max_connections=self.db_config.max_connections, mem_quota=self.db_config.mem_quota ) logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}") def get_connection(self): """从连接池获取连接""" return self.connection_pool.get_connection() def release_connection(self, conn): """释放连接回连接池""" self.connection_pool.release_connection(conn) def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """ 执行查询语句 Args: sql: SQL查询语句 params: SQL参数 Returns: 查询结果列表 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: cursor.execute(sql, params) result = cursor.fetchall() logger.debug(f"查询执行成功,返回 {len(result)} 条记录") return result except Exception as e: logger.error(f"查询执行失败: {e}") logger.error(f"SQL: {sql}") if params: logger.error(f"参数: {params}") raise finally: self.release_connection(conn) def execute_update(self, sql: str, params: Optional[tuple] = None) -> int: """ 执行更新语句(INSERT, UPDATE, DELETE) Args: sql: SQL更新语句 params: SQL参数 Returns: 影响的行数 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: affected_rows = cursor.execute(sql, params) conn.commit() logger.debug(f"更新执行成功,影响 {affected_rows} 行") return affected_rows except Exception as e: conn.rollback() logger.error(f"更新执行失败: {e}") logger.error(f"SQL: {sql}") if params: logger.error(f"参数: {params}") raise finally: self.release_connection(conn) def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int: """ 批量执行更新语句 Args: sql: SQL更新语句 params_list: SQL参数列表 Returns: 影响的行数 """ conn = self.get_connection() if not conn: raise Exception("无法从连接池获取数据库连接") try: with conn.cursor() as cursor: affected_rows = cursor.executemany(sql, params_list) conn.commit() logger.debug(f"批量更新执行成功,影响 {affected_rows} 行") return affected_rows except Exception as e: conn.rollback() logger.error(f"批量更新执行失败: {e}") logger.error(f"SQL: {sql}") raise finally: self.release_connection(conn) def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool: """ 检查表是否存在 Args: table_name: 表名 Returns: bool: 表是否存在 """ try: result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,)) return result[0]['table_exists'] > 0 except Exception as e: logger.error(f"检查表存在性失败: {e}") return False def check_table_columns(self) -> List[str]: """ 检查表的列 Returns: List[str]: 列名列表 """ try: result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,)) return [row['COLUMN_NAME'] for row in result] except Exception as e: logger.error(f"检查表列失败: {e}") return [] def add_missing_columns(self): """添加缺失的列""" try: logger.info("检查并添加缺失的列...") self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL) logger.info("表结构更新完成") except Exception as e: logger.error(f"更新表结构失败: {e}") def create_model_turbine_table(self) -> bool: """ 创建风机机型信息表 Returns: bool: 是否成功创建表 """ try: logger.info("开始创建风机机型信息表...") self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表创建成功") return True except Exception as e: logger.error(f"创建风机机型信息表失败: {e}") return False def drop_model_turbine_table(self) -> bool: """ 删除风机机型信息表 Returns: bool: 是否成功删除表 """ try: logger.info("开始删除风机机型信息表...") self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL) logger.info("风机机型信息表删除成功") return True except Exception as e: logger.error(f"删除风机机型信息表失败: {e}") return False def get_model_data(self) -> List[Dict[str, Any]]: """ 从info_turbine表获取机型分组数据 Returns: 机型数据列表 """ try: logger.info("开始从info_turbine表查询机型数据...") data = self.execute_query(SELECT_MODEL_DATA_SQL) logger.info(f"成功查询到 {len(data)} 条机型记录") return data except Exception as e: logger.error(f"查询机型数据失败: {e}") return [] def get_no_model_list(self) -> List[str]: """ 获取所有no_model列表 Returns: no_model列表 """ try: logger.info("开始查询所有机型标识...") result = self.execute_query(SELECT_NO_MODEL_LIST_SQL) no_models = [row['no_model'] for row in result] logger.info(f"成功获取 {len(no_models)} 个机型标识") return no_models except Exception as e: logger.error(f"查询机型标识列表失败: {e}") return [] def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]: """ 获取指定机型的SCADA数据用于参数计算 Args: no_model: 机型标识 Returns: SCADA数据列表 """ try: logger.debug(f"开始查询机型 {no_model} 的SCADA数据...") data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,)) logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录") return data except Exception as e: logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}") return [] def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]: """ 计算并更新指定机型的额定参数 Args: no_model: 机型标识 Returns: 计算结果的字典 """ try: # 先获取机型基本信息 model_info_sql = """ SELECT manufacturer, rated_capacity FROM info_model_turbine WHERE no_model = %s """ model_info = self.execute_query(model_info_sql, (no_model,)) if not model_info: logger.warning(f"机型 {no_model}: 未找到基本信息") return { "no_model": no_model, "success": False, "reason": "未找到机型基本信息", "data_points": 0 } manufacturer = model_info[0].get('manufacturer', '') rated_capacity = model_info[0].get('rated_capacity', 0) # 获取该机型的SCADA数据 scada_data = self.get_scada_data_for_no_model(no_model) if not scada_data: logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据") return { "no_model": no_model, "success": False, "reason": "无SCADA数据", "data_points": 0 } # 计算额定参数(带合理性校验) rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type = \ SCADADataProcessor.calculate_rated_speeds_and_ratio( scada_data, manufacturer, rated_capacity ) # 判断计算状态 if rated_rotor_spd <= 0 or transmission_ratio <= 0: calculation_status = "error" validation_info = "参数无效" success = False else: # 检查是否需要调整 manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer) expected_ratio_range = manu_info["ratio_range"] rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity) gen_range = ManufacturerInfo.get_generator_speed_range(turbine_type) # 判断是否经过调整 adjustments = [] if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]): adjustments.append(f"叶轮转速调整: {rated_rotor_spd:.1f} rpm") if not (gen_range[0] <= rated_gen_spd <= gen_range[1]): adjustments.append(f"发电机转速调整: {rated_gen_spd} rpm") if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]): adjustments.append(f"传动比调整: {transmission_ratio:.3f}") # 特殊处理:直驱机型传动比强制为1.000 if turbine_type == "直驱" and transmission_ratio != 1.000: adjustments.append(f"直驱机型传动比强制设置为1.000") if adjustments: calculation_status = "adjusted" validation_info = "; ".join(adjustments) else: calculation_status = "success" validation_info = "参数合理" success = True # 更新数据库 update_result = self.execute_update( UPDATE_MODEL_PARAMETERS_SQL, ( rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type, datetime.now(), len(scada_data), calculation_status, validation_info, no_model ) ) if update_result > 0: log_msg = (f"机型 {no_model}: {calculation_status} - " f"叶轮转速={rated_rotor_spd:.1f} rpm, " f"发电机转速={rated_gen_spd} rpm, " f"传动比={transmission_ratio:.3f}, " f"类型={turbine_type}, " f"数据点={len(scada_data)}") if calculation_status == "adjusted": logger.info(log_msg + f", 调整: {validation_info}") else: logger.info(log_msg) return { "no_model": no_model, "success": success, "status": calculation_status, "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data), "validation_info": validation_info } else: logger.warning(f"机型 {no_model}: 数据库更新失败") return { "no_model": no_model, "success": False, "reason": "数据库更新失败", "status": "error", "rated_rotor_spd": rated_rotor_spd, "rated_gen_spd": rated_gen_spd, "transmission_ratio": transmission_ratio, "turbine_type": turbine_type, "data_points": len(scada_data) } except Exception as e: logger.error(f"计算机型 {no_model} 参数时出错: {e}") return { "no_model": no_model, "success": False, "reason": str(e), "status": "error", "data_points": 0 } def calculate_and_update_all_parameters(self) -> Dict[str, Any]: """ 计算并更新所有机型的额定参数 Returns: 计算统计信息 """ try: logger.info("开始计算并更新所有机型的额定参数...") # 获取所有机型标识 no_model_list = self.get_no_model_list() if not no_model_list: logger.warning("没有找到任何机型标识") return { "total_models": 0, "success_count": 0, "failed_count": 0, "results": [] } logger.info(f"共发现 {len(no_model_list)} 个机型需要计算") results = [] success_count = 0 failed_count = 0 # 遍历每个机型进行计算 for i, no_model in enumerate(no_model_list, 1): logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}") result = self.calculate_and_update_parameters_for_model(no_model) results.append(result) if result.get("success"): success_count += 1 else: failed_count += 1 # 汇总统计 stats = { "total_models": len(no_model_list), "success_count": success_count, "failed_count": failed_count, "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0, "results": results } logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, " f"成功率={stats['success_rate']:.1f}%") return stats except Exception as e: logger.error(f"计算并更新所有参数失败: {e}") raise def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int: """ 将机型数据插入到info_model_turbine表 Args: model_data: 机型数据列表 Returns: int: 成功插入的记录数 """ if not model_data: logger.warning("没有数据需要插入") return 0 insert_sql = """ INSERT INTO info_model_turbine (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height, turbine_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE turbine_count = VALUES(turbine_count), updated_at = CURRENT_TIMESTAMP """ try: logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...") # 准备插入数据 insert_data = [] for item in model_data: insert_data.append(( item['no_model'], item['model'], item['manufacturer'], item['rated_capacity'], item['cut_in_wind_speed'], item['cut_out_wind_speed'], item['rotor_diameter'], item['hub_height'], item.get('turbine_count', 1) )) # 批量插入数据 affected_rows = self.execute_batch_update(insert_sql, insert_data) logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表") return affected_rows except Exception as e: logger.error(f"插入机型数据失败: {e}") raise def get_model_turbine_stats(self) -> Dict[str, Any]: """ 获取info_model_turbine表的统计信息 Returns: 统计信息字典 """ try: # 查询统计信息 stats_sql = """ SELECT COUNT(*) as total_models, COUNT(DISTINCT manufacturer) as manufacturer_count, COUNT(DISTINCT model) as model_count, MIN(rated_capacity) as min_capacity, MAX(rated_capacity) as max_capacity, AVG(rated_capacity) as avg_capacity, SUM(turbine_count) as total_turbines, COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models, COUNT(DISTINCT turbine_type) as turbine_type_count, SUM(data_points) as total_data_points, COUNT(CASE WHEN calculation_status = 'success' THEN 1 END) as success_count, COUNT(CASE WHEN calculation_status = 'adjusted' THEN 1 END) as adjusted_count, COUNT(CASE WHEN calculation_status = 'error' THEN 1 END) as error_count, COUNT(CASE WHEN turbine_type = '直驱' AND transmission_ratio = 1.000 THEN 1 END) as direct_drive_with_ratio_1 FROM info_model_turbine """ result = self.execute_query(stats_sql) return result[0] if result else {} except Exception as e: logger.error(f"获取统计信息失败: {e}") return {} def get_turbine_type_distribution(self) -> List[Dict[str, Any]]: """ 获取风机类型分布 Returns: 类型分布列表 """ try: type_sql = """ SELECT IFNULL(turbine_type, '未知') as turbine_type, COUNT(*) as model_count, SUM(turbine_count) as turbine_count, AVG(transmission_ratio) as avg_ratio, AVG(rated_rotor_spd) as avg_rotor_spd, AVG(rated_gen_spd) as avg_gen_spd, COUNT(CASE WHEN transmission_ratio = 1.000 THEN 1 END) as ratio_1_count FROM info_model_turbine WHERE turbine_type IS NOT NULL GROUP BY turbine_type ORDER BY model_count DESC """ return self.execute_query(type_sql) except Exception as e: logger.error(f"获取风机类型分布失败: {e}") return [] def print_model_summary(self, model_data: List[Dict[str, Any]]): """ 打印机型数据摘要 Args: model_data: 机型数据列表 """ if not model_data: logger.info("没有机型数据") return print("\n" + "="*80) print("风机机型数据摘要") print("="*80) print(f"总机型数: {len(model_data)}") # 按制造商统计 manufacturers = {} for item in model_data: manufacturer = item.get('manufacturer', '未知') manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1 print(f"制造商数: {len(manufacturers)}") print("\n制造商分布:") for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True): print(f" {manufacturer}: {count} 种机型") # 按额定容量统计 capacities = {} for item in model_data: capacity = item.get('rated_capacity', 0) if capacity: capacity_range = f"{capacity}kW" capacities[capacity_range] = capacities.get(capacity_range, 0) + 1 print("\n额定容量分布:") for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))): print(f" {capacity}: {count} 种机型") print("="*80) def print_calculation_summary(self, stats: Dict[str, Any]): """打印参数计算摘要""" print("\n" + "="*80) print("参数计算统计") print("="*80) print(f"总机型数: {stats.get('total_models', 0)}") print(f"成功计算的机型数: {stats.get('success_count', 0)}") print(f"失败的机型数: {stats.get('failed_count', 0)}") print(f"成功率: {stats.get('success_rate', 0):.1f}%") # 显示成功计算的部分结果 success_results = [r for r in stats.get('results', []) if r.get('success')] if success_results: print(f"\n成功计算的机型示例 (前5个):") for i, result in enumerate(success_results[:5]): print(f" {i+1}. {result['no_model']}:") print(f" 叶轮转速: {result['rated_rotor_spd']:.1f} rpm") print(f" 发电机转速: {result['rated_gen_spd']} rpm") print(f" 传动比: {result['transmission_ratio']:.3f}") print(f" 类型: {result['turbine_type']}") print(f" 数据点数: {result['data_points']}") if result.get('validation_info'): print(f" 校验信息: {result['validation_info']}") # 显示失败的部分原因 failed_results = [r for r in stats.get('results', []) if not r.get('success')] if failed_results: print(f"\n失败的机型示例 (前5个):") for i, result in enumerate(failed_results[:5]): print(f" {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}") print("="*80) def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool: """ 运行完整的机型数据提取和参数计算流程 Args: recreate_table: 是否重新创建表 calculate_params: 是否计算额定参数 Returns: bool: 整个流程是否成功 """ try: logger.info("开始执行风机机型数据提取和参数计算流程...") # 步骤1: 检查或创建表 if recreate_table: self.drop_model_turbine_table() self.create_model_turbine_table() else: if not self.check_table_exists(): self.create_model_turbine_table() else: logger.info("info_model_turbine表已存在,将追加数据") # 检查并添加缺失的列 self.add_missing_columns() # 步骤2: 从info_turbine表获取机型数据 model_data = self.get_model_data() if not model_data: logger.error("未获取到机型数据,流程终止") return False # 步骤3: 打印摘要信息 self.print_model_summary(model_data) # 步骤4: 插入数据到info_model_turbine表 inserted_count = self.insert_model_data(model_data) # 步骤5: 计算额定参数 if calculate_params: calculation_stats = self.calculate_and_update_all_parameters() self.print_calculation_summary(calculation_stats) # 步骤6: 获取并显示最终统计信息 stats = self.get_model_turbine_stats() if stats: print("\n数据库统计信息:") print(f" 总机型数: {stats.get('total_models', 0)}") print(f" 制造商数: {stats.get('manufacturer_count', 0)}") print(f" 总风机数: {stats.get('total_turbines', 0)}") print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW") print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW") print(f" 已计算参数的机型数: {stats.get('calculated_models', 0)}") print(f" 成功计算: {stats.get('success_count', 0)}, 调整后计算: {stats.get('adjusted_count', 0)}, 错误: {stats.get('error_count', 0)}") print(f" 直驱机型传动比=1.000的数量: {stats.get('direct_drive_with_ratio_1', 0)}") print(f" 使用的总数据点数: {stats.get('total_data_points', 0)}") # 显示风机类型分布 type_dist = self.get_turbine_type_distribution() if type_dist: print("\n风机类型分布:") for item in type_dist: print(f" {item['turbine_type']}: {item['model_count']} 种机型, " f"{item['turbine_count']} 台风机") if item['avg_ratio']: print(f" 平均传动比: {item['avg_ratio']:.3f}, " f"平均叶轮转速: {item['avg_rotor_spd']:.1f} rpm, " f"平均发电机转速: {item['avg_gen_spd']:.0f} rpm") if item['turbine_type'] == '直驱' and item.get('ratio_1_count'): print(f" 传动比=1.000的机型: {item['ratio_1_count']} 种") logger.info("风机机型数据提取和参数计算流程执行完成!") return True except Exception as e: logger.error(f"流程执行失败: {e}") import traceback traceback.print_exc() return False finally: # 关闭连接池 if self.connection_pool: self.connection_pool.close_all() def main(): """主函数""" # 数据库配置 db_config = DatabaseConfig( host="192.168.50.234", port=4000, user='root', password='123456', database='wind_data', charset='utf8mb4', max_connections=2, mem_quota=4 << 30 # 4GB ) # 创建管理器实例 manager = ModelTurbineManager(db_config) # 运行机型数据提取和参数计算流程 success = manager.run_model_extraction_pipeline( recreate_table=False, # 是否重新创建表 calculate_params=True # 是否计算额定参数 ) if success: logger.info("风机机型数据提取和参数计算成功完成!") else: logger.error("风机机型数据提取和参数计算失败!") if __name__ == "__main__": # 导入numpy用于统计计算 try: import numpy as np except ImportError: logger.warning("未找到numpy库,使用简单统计方法") # 定义简单的替代函数 def percentile(data, percentiles): """简单的百分位数计算""" if not data: return [0, 0] sorted_data = sorted(data) n = len(sorted_data) return [ sorted_data[int((n-1) * p / 100)] for p in percentiles ] # 临时创建numpy模块 class NumpyStub: @staticmethod def percentile(data, percentiles): return percentile(data, percentiles) np = NumpyStub() main()