| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098 |
- import pymysql
- import threading
- from typing import List, Dict, Any, Optional, Tuple
- import logging
- from datetime import datetime
- from collections import Counter
- import statistics
- import math
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class ConnectionPool:
- """MySQL数据库连接池"""
-
- def __init__(self, host, port, user, password, database, charset='utf8mb4',
- max_connections=2, mem_quota=4 << 30):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
- self.max_connections = max_connections
- self.mem_quota = mem_quota
-
- self._lock = threading.Lock()
- self._connections = []
- self._in_use = set()
-
- def _create_connection(self):
- """创建新连接并设置内存配额"""
- try:
- conn = pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset=self.charset,
- cursorclass=pymysql.cursors.DictCursor
- )
-
- # 设置会话内存配额
- with conn.cursor() as cursor:
- cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}")
- conn.commit()
-
- logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}")
- return conn
- except Exception as e:
- logger.error(f"创建数据库连接失败: {e}")
- raise
-
- def get_connection(self):
- """从连接池获取连接"""
- with self._lock:
- # 如果有空闲连接,返回一个
- for conn in self._connections:
- if conn not in self._in_use:
- self._in_use.add(conn)
- logger.debug(f"从连接池获取现有连接")
- return conn
-
- # 如果没有空闲连接但可以创建新连接
- if len(self._connections) < self.max_connections:
- conn = self._create_connection()
- self._connections.append(conn)
- self._in_use.add(conn)
- logger.debug(f"创建新连接,当前连接数: {len(self._connections)}")
- return conn
-
- # 连接池已满,等待
- logger.warning("连接池已满,等待可用连接...")
- # 这里简单实现为等待并重试
- import time
- time.sleep(1)
- return self.get_connection() # 递归重试
-
- def release_connection(self, conn):
- """释放连接回连接池"""
- with self._lock:
- if conn in self._in_use:
- self._in_use.remove(conn)
- logger.debug(f"释放连接回连接池")
-
- def close_all(self):
- """关闭所有连接"""
- with self._lock:
- for conn in self._connections:
- try:
- conn.close()
- except:
- pass
- self._connections.clear()
- self._in_use.clear()
- logger.info("已关闭所有数据库连接")
- class DatabaseConfig:
- """数据库配置类"""
- def __init__(self, host='192.168.50.234', port=4000, user='root',
- password='123456', database='wind_data', charset='utf8mb4',
- max_connections=2, mem_quota=4 << 30):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
- self.max_connections = max_connections
- self.mem_quota = mem_quota
- class SCADADataProcessor:
- """SCADA数据处理类,用于计算额定转速和传动比"""
-
- @staticmethod
- def calculate_mode(values: List[float], decimal_places: int = 1) -> float:
- """
- 计算众数(对连续数据使用四舍五入后统计)
-
- Args:
- values: 数值列表
- decimal_places: 保留的小数位数
-
- Returns:
- 众数值
- """
- if not values:
- return 0.0
-
- # 对值进行四舍五入处理,减少噪声影响
- rounded_values = [round(v, decimal_places) for v in values]
-
- # 使用Counter统计频率
- counter = Counter(rounded_values)
-
- # 找到众数
- if counter:
- mode_value, count = counter.most_common(1)[0]
- logger.debug(f"众数统计: 值={mode_value}, 频次={count}, 总数据点={len(values)}")
- return mode_value
-
- return 0.0
-
- @staticmethod
- def calculate_median(values: List[float]) -> float:
- """
- 计算中位数
-
- Args:
- values: 数值列表
-
- Returns:
- 中位数值
- """
- if not values:
- return 0.0
-
- try:
- return float(statistics.median(values))
- except:
- # 如果统计模块出错,使用排序方法
- sorted_values = sorted(values)
- n = len(sorted_values)
- if n % 2 == 1:
- return sorted_values[n // 2]
- else:
- return (sorted_values[n // 2 - 1] + sorted_values[n // 2]) / 2
-
- @staticmethod
- def calculate_rated_speeds_and_ratio(data: List[Dict[str, Any]]) -> Tuple[float, float, float]:
- """
- 计算额定叶轮转速、额定发电机转速和传动比
-
- Args:
- data: SCADA数据列表,包含转子转速和发电机转速
-
- Returns:
- Tuple[float, float, float]: (rated_rotor_spd, rated_gen_spd, transmission_ratio)
- """
- if not data:
- return 0.0, 0.0, 0.0
-
- # 提取数据
- rotor_speeds = []
- gen_speeds = []
-
- for record in data:
- rotor_spd = record.get('rotor_spd')
- gen_spd = record.get('gen_spd')
-
- if rotor_spd is not None and gen_spd is not None:
- try:
- rotor_speeds.append(float(rotor_spd))
- gen_speeds.append(float(gen_spd))
- except (ValueError, TypeError):
- continue
-
- if not rotor_speeds or not gen_speeds:
- logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}")
- return 0.0, 0.0, 0.0
-
- # 计算额定转速(使用众数方法)
- rated_rotor_spd = SCADADataProcessor.calculate_mode(rotor_speeds, decimal_places=1)
- rated_gen_spd = SCADADataProcessor.calculate_mode(gen_speeds, decimal_places=1)
-
- # 如果众数方法效果不好,尝试使用中位数
- if rated_rotor_spd <= 0 or rated_gen_spd <= 0:
- rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds)
- rated_gen_spd = SCADADataProcessor.calculate_median(gen_speeds)
-
- # 计算传动比
- if rated_rotor_spd > 0:
- transmission_ratio = rated_gen_spd / rated_rotor_spd
- else:
- transmission_ratio = 0.0
-
- # 验证数据合理性
- if rated_rotor_spd > 30: # 叶轮转速通常小于30 rpm
- logger.warning(f"叶轮转速异常高: {rated_rotor_spd} rpm,使用中位数重新计算")
- rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds)
- if rated_rotor_spd > 0:
- transmission_ratio = rated_gen_spd / rated_rotor_spd
-
- if transmission_ratio > 200: # 传动比通常小于200
- logger.warning(f"传动比异常高: {transmission_ratio}")
- transmission_ratio = 0.0
-
- logger.debug(f"计算结果: 转子转速={rated_rotor_spd:.2f} rpm, "
- f"发电机转速={rated_gen_spd:.2f} rpm, "
- f"传动比={transmission_ratio:.2f}")
-
- return rated_rotor_spd, rated_gen_spd, transmission_ratio
-
- @staticmethod
- def detect_turbine_type(transmission_ratio: float, rated_gen_spd: float) -> str:
- """
- 根据传动比和发电机额定转速判断风机类型
-
- Returns:
- str: 风机类型 (直驱/双馈/半直驱/未知)
- """
- if transmission_ratio <= 1.2:
- return "直驱"
- elif 1.2 < transmission_ratio <= 30:
- return "半直驱"
- elif 30 < transmission_ratio <= 120:
- return "双馈"
- else:
- # 根据发电机转速进一步判断
- if rated_gen_spd < 50: # 直驱发电机转速很低
- return "直驱"
- elif 1000 <= rated_gen_spd <= 2000: # 双馈通常在同步转速附近
- return "双馈"
- elif rated_gen_spd > 2000: # 半直驱转速较高
- return "半直驱"
- else:
- return "未知"
- # SQL语句定义
- CREATE_MODEL_TURBINE_TABLE_SQL = """
- CREATE TABLE IF NOT EXISTS info_model_turbine (
- id INT AUTO_INCREMENT PRIMARY KEY,
- no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识',
- model VARCHAR(100) COMMENT '机型',
- manufacturer VARCHAR(100) COMMENT '制造商',
- rated_capacity INT COMMENT '额定容量(kW)',
- cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)',
- cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)',
- rotor_diameter INT COMMENT '叶轮直径(m)',
- hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)',
- turbine_count INT DEFAULT 0 COMMENT '该机型风机数量',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY idx_no_model (no_model),
- INDEX idx_model (model),
- INDEX idx_manufacturer (manufacturer),
- INDEX idx_rated_capacity (rated_capacity)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表';
- """
- ALTER_MODEL_TURBINE_TABLE_SQL = """
- ALTER TABLE info_model_turbine
- ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 3) COMMENT '额定叶轮转速(rpm)',
- ADD COLUMN IF NOT EXISTS rated_gen_spd DECIMAL(10, 3) COMMENT '额定发电机转速(rpm)',
- ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 4) COMMENT '传动比',
- ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)',
- ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间',
- ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数';
- """
- SELECT_MODEL_DATA_SQL = """
- SELECT
- CONCAT(
- IFNULL(model, ''),
- '-',
- IFNULL(cut_in_wind_speed, ''),
- '-',
- IFNULL(cut_out_wind_speed, ''),
- '-',
- IFNULL(hub_height, '')
- ) AS no_model,
- model,
- manufacturer,
- rated_capacity,
- cut_in_wind_speed,
- cut_out_wind_speed,
- rotor_diameter,
- hub_height,
- COUNT(*) AS turbine_count
- FROM info_turbine
- WHERE model IS NOT NULL
- GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
- ORDER BY model, manufacturer, rated_capacity;
- """
- SELECT_NO_MODEL_LIST_SQL = """
- SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model;
- """
- SELECT_SCADA_FOR_NO_MODEL_SQL = """
- SELECT
- it.wind_farm_id,
- it.turbine_id,
- it.model,
- icpt.rated_wind_speed,
- imt.no_model,
- imt.rated_capacity,
- imt.cut_in_wind_speed,
- imt.cut_out_wind_speed,
- imt.rotor_diameter,
- imt.hub_height,
- dst.data_time,
- dst.wind_spd,
- dst.rotor_spd,
- dst.gen_spd,
- dst.p_active
- FROM info_turbine it,
- info_curve_power_turbine icpt,
- info_model_turbine imt,
- data_scada_turbine dst
- WHERE 1=1
- AND it.wind_farm_id = dst.id_farm
- AND it.turbine_id = dst.id_turbine
- AND it.model = dst.no_model_turbine
- AND it.model = imt.model
- AND it.cut_in_wind_speed = imt.cut_in_wind_speed
- AND it.cut_out_wind_speed = imt.cut_out_wind_speed
- AND it.rotor_diameter = imt.rotor_diameter
- AND it.hub_height = imt.hub_height
- AND it.wind_farm_id = icpt.wind_farm_id
- AND it.model = icpt.standard_model
- AND imt.no_model = %s
- AND dst.wind_spd >= icpt.rated_wind_speed
- AND dst.p_active >= imt.rated_capacity * 0.95
- AND dst.p_active <= imt.rated_capacity * 1.05
- AND dst.rotor_spd IS NOT NULL
- AND dst.gen_spd IS NOT NULL
- AND dst.rotor_spd > 0
- AND dst.gen_spd > 0
- ORDER BY dst.data_time;
- """
- DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
- CHECK_TABLE_EXISTS_SQL = """
- SELECT COUNT(*) as table_exists
- FROM information_schema.tables
- WHERE table_schema = %s AND table_name = 'info_model_turbine'
- """
- CHECK_TABLE_COLUMNS_SQL = """
- SELECT COLUMN_NAME
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine'
- """
- UPDATE_MODEL_PARAMETERS_SQL = """
- UPDATE info_model_turbine
- SET rated_rotor_spd = %s,
- rated_gen_spd = %s,
- transmission_ratio = %s,
- turbine_type = %s,
- calculation_time = %s,
- data_points = %s,
- updated_at = CURRENT_TIMESTAMP
- WHERE no_model = %s
- """
- class ModelTurbineManager:
- """风机机型信息管理器"""
-
- def __init__(self, db_config: DatabaseConfig):
- """
- 初始化管理器
-
- Args:
- db_config: 数据库配置对象
- """
- self.db_config = db_config
- self.connection_pool = None
- self._initialize_connection_pool()
-
- def _initialize_connection_pool(self):
- """初始化数据库连接池"""
- self.connection_pool = ConnectionPool(
- host=self.db_config.host,
- port=self.db_config.port,
- user=self.db_config.user,
- password=self.db_config.password,
- database=self.db_config.database,
- charset=self.db_config.charset,
- max_connections=self.db_config.max_connections,
- mem_quota=self.db_config.mem_quota
- )
- logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}")
-
- def get_connection(self):
- """从连接池获取连接"""
- return self.connection_pool.get_connection()
-
- def release_connection(self, conn):
- """释放连接回连接池"""
- self.connection_pool.release_connection(conn)
-
- def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
- """
- 执行查询语句
-
- Args:
- sql: SQL查询语句
- params: SQL参数
-
- Returns:
- 查询结果列表
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, params)
- result = cursor.fetchall()
- logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
- return result
- except Exception as e:
- logger.error(f"查询执行失败: {e}")
- logger.error(f"SQL: {sql}")
- if params:
- logger.error(f"参数: {params}")
- raise
- finally:
- self.release_connection(conn)
-
- def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
- """
- 执行更新语句(INSERT, UPDATE, DELETE)
-
- Args:
- sql: SQL更新语句
- params: SQL参数
-
- Returns:
- 影响的行数
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- affected_rows = cursor.execute(sql, params)
- conn.commit()
- logger.debug(f"更新执行成功,影响 {affected_rows} 行")
- return affected_rows
- except Exception as e:
- conn.rollback()
- logger.error(f"更新执行失败: {e}")
- logger.error(f"SQL: {sql}")
- if params:
- logger.error(f"参数: {params}")
- raise
- finally:
- self.release_connection(conn)
-
- def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int:
- """
- 批量执行更新语句
-
- Args:
- sql: SQL更新语句
- params_list: SQL参数列表
-
- Returns:
- 影响的行数
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- affected_rows = cursor.executemany(sql, params_list)
- conn.commit()
- logger.debug(f"批量更新执行成功,影响 {affected_rows} 行")
- return affected_rows
- except Exception as e:
- conn.rollback()
- logger.error(f"批量更新执行失败: {e}")
- logger.error(f"SQL: {sql}")
- raise
- finally:
- self.release_connection(conn)
-
- def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
- """
- 检查表是否存在
-
- Args:
- table_name: 表名
-
- Returns:
- bool: 表是否存在
- """
- try:
- result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
- return result[0]['table_exists'] > 0
- except Exception as e:
- logger.error(f"检查表存在性失败: {e}")
- return False
-
- def check_table_columns(self) -> List[str]:
- """
- 检查表的列
-
- Returns:
- List[str]: 列名列表
- """
- try:
- result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,))
- return [row['COLUMN_NAME'] for row in result]
- except Exception as e:
- logger.error(f"检查表列失败: {e}")
- return []
-
- def add_missing_columns(self):
- """添加缺失的列"""
- try:
- logger.info("检查并添加缺失的列...")
- self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL)
- logger.info("表结构更新完成")
- except Exception as e:
- logger.error(f"更新表结构失败: {e}")
-
- def create_model_turbine_table(self) -> bool:
- """
- 创建风机机型信息表
-
- Returns:
- bool: 是否成功创建表
- """
- try:
- logger.info("开始创建风机机型信息表...")
- self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表创建成功")
- return True
- except Exception as e:
- logger.error(f"创建风机机型信息表失败: {e}")
- return False
-
- def drop_model_turbine_table(self) -> bool:
- """
- 删除风机机型信息表
-
- Returns:
- bool: 是否成功删除表
- """
- try:
- logger.info("开始删除风机机型信息表...")
- self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表删除成功")
- return True
- except Exception as e:
- logger.error(f"删除风机机型信息表失败: {e}")
- return False
-
- def get_model_data(self) -> List[Dict[str, Any]]:
- """
- 从info_turbine表获取机型分组数据
-
- Returns:
- 机型数据列表
- """
- try:
- logger.info("开始从info_turbine表查询机型数据...")
- data = self.execute_query(SELECT_MODEL_DATA_SQL)
- logger.info(f"成功查询到 {len(data)} 条机型记录")
- return data
- except Exception as e:
- logger.error(f"查询机型数据失败: {e}")
- return []
-
- def get_no_model_list(self) -> List[str]:
- """
- 获取所有no_model列表
-
- Returns:
- no_model列表
- """
- try:
- logger.info("开始查询所有机型标识...")
- result = self.execute_query(SELECT_NO_MODEL_LIST_SQL)
- no_models = [row['no_model'] for row in result]
- logger.info(f"成功获取 {len(no_models)} 个机型标识")
- return no_models
- except Exception as e:
- logger.error(f"查询机型标识列表失败: {e}")
- return []
-
- def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]:
- """
- 获取指定机型的SCADA数据用于参数计算
-
- Args:
- no_model: 机型标识
-
- Returns:
- SCADA数据列表
- """
- try:
- logger.debug(f"开始查询机型 {no_model} 的SCADA数据...")
- data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,))
- logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录")
- return data
- except Exception as e:
- logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}")
- return []
-
- def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]:
- """
- 计算并更新指定机型的额定参数
-
- Args:
- no_model: 机型标识
-
- Returns:
- 计算结果的字典
- """
- try:
- # 获取该机型的SCADA数据
- scada_data = self.get_scada_data_for_no_model(no_model)
-
- if not scada_data:
- logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "无SCADA数据",
- "data_points": 0
- }
-
- # 计算额定参数
- rated_rotor_spd, rated_gen_spd, transmission_ratio = \
- SCADADataProcessor.calculate_rated_speeds_and_ratio(scada_data)
-
- if rated_rotor_spd <= 0 or transmission_ratio <= 0:
- logger.warning(f"机型 {no_model}: 计算出的参数无效")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "参数无效",
- "rated_rotor_spd": rated_rotor_spd,
- "rated_gen_spd": rated_gen_spd,
- "transmission_ratio": transmission_ratio,
- "data_points": len(scada_data)
- }
-
- # 判断风机类型
- turbine_type = SCADADataProcessor.detect_turbine_type(transmission_ratio, rated_gen_spd)
-
- # 更新数据库
- update_result = self.execute_update(
- UPDATE_MODEL_PARAMETERS_SQL,
- (
- rated_rotor_spd,
- rated_gen_spd,
- transmission_ratio,
- turbine_type,
- datetime.now(),
- len(scada_data),
- no_model
- )
- )
-
- if update_result > 0:
- logger.info(f"机型 {no_model}: 成功更新参数 - "
- f"叶轮转速={rated_rotor_spd:.2f} rpm, "
- f"发电机转速={rated_gen_spd:.2f} rpm, "
- f"传动比={transmission_ratio:.2f}, "
- f"类型={turbine_type}, "
- f"数据点={len(scada_data)}")
-
- return {
- "no_model": no_model,
- "success": True,
- "rated_rotor_spd": rated_rotor_spd,
- "rated_gen_spd": rated_gen_spd,
- "transmission_ratio": transmission_ratio,
- "turbine_type": turbine_type,
- "data_points": len(scada_data)
- }
- else:
- logger.warning(f"机型 {no_model}: 数据库更新失败")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "数据库更新失败",
- "rated_rotor_spd": rated_rotor_spd,
- "rated_gen_spd": rated_gen_spd,
- "transmission_ratio": transmission_ratio,
- "turbine_type": turbine_type,
- "data_points": len(scada_data)
- }
-
- except Exception as e:
- logger.error(f"计算机型 {no_model} 参数时出错: {e}")
- return {
- "no_model": no_model,
- "success": False,
- "reason": str(e),
- "data_points": 0
- }
-
- def calculate_and_update_all_parameters(self) -> Dict[str, Any]:
- """
- 计算并更新所有机型的额定参数
-
- Returns:
- 计算统计信息
- """
- try:
- logger.info("开始计算并更新所有机型的额定参数...")
-
- # 获取所有机型标识
- no_model_list = self.get_no_model_list()
-
- if not no_model_list:
- logger.warning("没有找到任何机型标识")
- return {
- "total_models": 0,
- "success_count": 0,
- "failed_count": 0,
- "results": []
- }
-
- logger.info(f"共发现 {len(no_model_list)} 个机型需要计算")
-
- results = []
- success_count = 0
- failed_count = 0
-
- # 遍历每个机型进行计算
- for i, no_model in enumerate(no_model_list, 1):
- logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}")
-
- result = self.calculate_and_update_parameters_for_model(no_model)
- results.append(result)
-
- if result.get("success"):
- success_count += 1
- else:
- failed_count += 1
-
- # 汇总统计
- stats = {
- "total_models": len(no_model_list),
- "success_count": success_count,
- "failed_count": failed_count,
- "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0,
- "results": results
- }
-
- logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, "
- f"成功率={stats['success_rate']:.1f}%")
-
- return stats
-
- except Exception as e:
- logger.error(f"计算并更新所有参数失败: {e}")
- raise
-
- def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
- """
- 将机型数据插入到info_model_turbine表
-
- Args:
- model_data: 机型数据列表
-
- Returns:
- int: 成功插入的记录数
- """
- if not model_data:
- logger.warning("没有数据需要插入")
- return 0
-
- insert_sql = """
- INSERT INTO info_model_turbine
- (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed,
- cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- turbine_count = VALUES(turbine_count),
- updated_at = CURRENT_TIMESTAMP
- """
-
- try:
- logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
-
- # 准备插入数据
- insert_data = []
- for item in model_data:
- insert_data.append((
- item['no_model'],
- item['model'],
- item['manufacturer'],
- item['rated_capacity'],
- item['cut_in_wind_speed'],
- item['cut_out_wind_speed'],
- item['rotor_diameter'],
- item['hub_height'],
- item.get('turbine_count', 1) # 使用查询中的count值
- ))
-
- # 批量插入数据
- affected_rows = self.execute_batch_update(insert_sql, insert_data)
-
- logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
- return affected_rows
-
- except Exception as e:
- logger.error(f"插入机型数据失败: {e}")
- raise
-
- def get_model_turbine_stats(self) -> Dict[str, Any]:
- """
- 获取info_model_turbine表的统计信息
-
- Returns:
- 统计信息字典
- """
- try:
- # 查询统计信息
- stats_sql = """
- SELECT
- COUNT(*) as total_models,
- COUNT(DISTINCT manufacturer) as manufacturer_count,
- COUNT(DISTINCT model) as model_count,
- MIN(rated_capacity) as min_capacity,
- MAX(rated_capacity) as max_capacity,
- AVG(rated_capacity) as avg_capacity,
- SUM(turbine_count) as total_turbines,
- COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models,
- COUNT(DISTINCT turbine_type) as turbine_type_count,
- SUM(data_points) as total_data_points
- FROM info_model_turbine
- """
-
- result = self.execute_query(stats_sql)
- return result[0] if result else {}
-
- except Exception as e:
- logger.error(f"获取统计信息失败: {e}")
- return {}
-
- def get_turbine_type_distribution(self) -> List[Dict[str, Any]]:
- """
- 获取风机类型分布
-
- Returns:
- 类型分布列表
- """
- try:
- type_sql = """
- SELECT
- IFNULL(turbine_type, '未知') as turbine_type,
- COUNT(*) as model_count,
- SUM(turbine_count) as turbine_count,
- AVG(transmission_ratio) as avg_ratio,
- AVG(rated_rotor_spd) as avg_rotor_spd,
- AVG(rated_gen_spd) as avg_gen_spd
- FROM info_model_turbine
- WHERE turbine_type IS NOT NULL
- GROUP BY turbine_type
- ORDER BY model_count DESC
- """
-
- return self.execute_query(type_sql)
-
- except Exception as e:
- logger.error(f"获取风机类型分布失败: {e}")
- return []
-
- def print_model_summary(self, model_data: List[Dict[str, Any]]):
- """
- 打印机型数据摘要
-
- Args:
- model_data: 机型数据列表
- """
- if not model_data:
- logger.info("没有机型数据")
- return
-
- print("\n" + "="*80)
- print("风机机型数据摘要")
- print("="*80)
- print(f"总机型数: {len(model_data)}")
-
- # 按制造商统计
- manufacturers = {}
- for item in model_data:
- manufacturer = item.get('manufacturer', '未知')
- manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
-
- print(f"制造商数: {len(manufacturers)}")
- print("\n制造商分布:")
- for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
- print(f" {manufacturer}: {count} 种机型")
-
- # 按额定容量统计
- capacities = {}
- for item in model_data:
- capacity = item.get('rated_capacity', 0)
- if capacity:
- capacity_range = f"{capacity}kW"
- capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
-
- print("\n额定容量分布:")
- for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
- print(f" {capacity}: {count} 种机型")
-
- print("="*80)
-
- def print_calculation_summary(self, stats: Dict[str, Any]):
- """打印参数计算摘要"""
- print("\n" + "="*80)
- print("参数计算统计")
- print("="*80)
- print(f"总机型数: {stats.get('total_models', 0)}")
- print(f"成功计算的机型数: {stats.get('success_count', 0)}")
- print(f"失败的机型数: {stats.get('failed_count', 0)}")
- print(f"成功率: {stats.get('success_rate', 0):.1f}%")
-
- # 显示成功计算的部分结果
- success_results = [r for r in stats.get('results', []) if r.get('success')]
- if success_results:
- print(f"\n成功计算的机型示例 (前5个):")
- for i, result in enumerate(success_results[:5]):
- print(f" {i+1}. {result['no_model']}:")
- print(f" 叶轮转速: {result['rated_rotor_spd']:.2f} rpm")
- print(f" 发电机转速: {result['rated_gen_spd']:.2f} rpm")
- print(f" 传动比: {result['transmission_ratio']:.2f}")
- print(f" 类型: {result['turbine_type']}")
- print(f" 数据点数: {result['data_points']}")
-
- # 显示失败的部分原因
- failed_results = [r for r in stats.get('results', []) if not r.get('success')]
- if failed_results:
- print(f"\n失败的机型示例 (前5个):")
- for i, result in enumerate(failed_results[:5]):
- print(f" {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}")
-
- print("="*80)
-
- def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool:
- """
- 运行完整的机型数据提取和参数计算流程
-
- Args:
- recreate_table: 是否重新创建表
- calculate_params: 是否计算额定参数
-
- Returns:
- bool: 整个流程是否成功
- """
- try:
- logger.info("开始执行风机机型数据提取和参数计算流程...")
-
- # 步骤1: 检查或创建表
- if recreate_table:
- self.drop_model_turbine_table()
- self.create_model_turbine_table()
- else:
- if not self.check_table_exists():
- self.create_model_turbine_table()
- else:
- logger.info("info_model_turbine表已存在,将追加数据")
- # 检查并添加缺失的列
- self.add_missing_columns()
-
- # 步骤2: 从info_turbine表获取机型数据
- model_data = self.get_model_data()
- if not model_data:
- logger.error("未获取到机型数据,流程终止")
- return False
-
- # 步骤3: 打印摘要信息
- self.print_model_summary(model_data)
-
- # 步骤4: 插入数据到info_model_turbine表
- inserted_count = self.insert_model_data(model_data)
-
- # 步骤5: 计算额定参数
- if calculate_params:
- calculation_stats = self.calculate_and_update_all_parameters()
- self.print_calculation_summary(calculation_stats)
-
- # 步骤6: 获取并显示最终统计信息
- stats = self.get_model_turbine_stats()
- if stats:
- print("\n数据库统计信息:")
- print(f" 总机型数: {stats.get('total_models', 0)}")
- print(f" 制造商数: {stats.get('manufacturer_count', 0)}")
- print(f" 总风机数: {stats.get('total_turbines', 0)}")
- print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
- print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
- print(f" 已计算参数的机型数: {stats.get('calculated_models', 0)}")
- print(f" 使用的总数据点数: {stats.get('total_data_points', 0)}")
-
- # 显示风机类型分布
- type_dist = self.get_turbine_type_distribution()
- if type_dist:
- print("\n风机类型分布:")
- for item in type_dist:
- print(f" {item['turbine_type']}: {item['model_count']} 种机型, "
- f"{item['turbine_count']} 台风机")
- if item['avg_ratio']:
- print(f" 平均传动比: {item['avg_ratio']:.2f}, "
- f"平均叶轮转速: {item['avg_rotor_spd']:.2f} rpm, "
- f"平均发电机转速: {item['avg_gen_spd']:.2f} rpm")
-
- logger.info("风机机型数据提取和参数计算流程执行完成!")
- return True
-
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- finally:
- # 关闭连接池
- if self.connection_pool:
- self.connection_pool.close_all()
- def main():
- """主函数"""
- # 数据库配置
- db_config = DatabaseConfig(
- host="106.120.102.238",
- port=44000,
- user='root',
- password='123456',
- database='wind_data',
- charset='utf8mb4',
- max_connections=2,
- mem_quota=4 << 30 # 4GB
- )
-
- # 创建管理器实例
- manager = ModelTurbineManager(db_config)
-
- # 运行机型数据提取和参数计算流程
- # recreate_table=True 会删除并重新创建表
- # calculate_params=True 会计算额定参数
- success = manager.run_model_extraction_pipeline(
- recreate_table=False,
- calculate_params=True
- )
-
- if success:
- logger.info("风机机型数据提取和参数计算成功完成!")
- else:
- logger.error("风机机型数据提取和参数计算失败!")
- if __name__ == "__main__":
- main()
|