Forráskód Böngészése

完善 机型的 叶轮额定转速、发电机额定转速、传动比的计算

zhouyang.xie 2 hónapja
szülő
commit
6f08caa8f5
1 módosított fájl, 635 hozzáadás és 6 törlés
  1. 635 6
      dataStorage_datang/info_model_turbine_v3.py

+ 635 - 6
dataStorage_datang/info_model_turbine_v3.py

@@ -5,7 +5,6 @@ import logging
 from datetime import datetime
 from collections import Counter
 import statistics
-import math
 
 # 配置日志
 logging.basicConfig(
@@ -417,7 +416,7 @@ class SCADADataProcessor:
         )
 
 
-# SQL语句定义(保持不变)
+# SQL语句定义
 CREATE_MODEL_TURBINE_TABLE_SQL = """
 CREATE TABLE IF NOT EXISTS info_model_turbine (
     id INT AUTO_INCREMENT PRIMARY KEY,
@@ -465,6 +464,93 @@ SET rated_rotor_spd = %s,
 WHERE no_model = %s
 """
 
+# 其他SQL语句
+SELECT_MODEL_DATA_SQL = """
+SELECT 
+    CONCAT(
+        IFNULL(model, ''), 
+        '-', 
+        IFNULL(cut_in_wind_speed, ''), 
+        '-', 
+        IFNULL(cut_out_wind_speed, ''),
+        '-', 
+        IFNULL(hub_height, '')
+    ) AS no_model,
+    model,
+    manufacturer,
+    rated_capacity,
+    cut_in_wind_speed,
+    cut_out_wind_speed,
+    rotor_diameter,
+    hub_height,
+    COUNT(*) AS turbine_count
+FROM info_turbine 
+WHERE model IS NOT NULL
+GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
+ORDER BY model, manufacturer, rated_capacity;
+"""
+
+SELECT_NO_MODEL_LIST_SQL = """
+SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model;
+"""
+
+SELECT_SCADA_FOR_NO_MODEL_SQL = """
+SELECT 
+    it.wind_farm_id,
+    it.turbine_id,
+    it.model,
+    icpt.rated_wind_speed,
+    imt.no_model,
+    imt.rated_capacity,
+    imt.cut_in_wind_speed,
+    imt.cut_out_wind_speed,
+    imt.rotor_diameter,
+    imt.hub_height,
+    dst.data_time,
+    dst.wind_spd,
+    dst.rotor_spd,
+    dst.gen_spd,
+    dst.p_active
+FROM info_turbine it,
+     info_curve_power_turbine icpt,
+     info_model_turbine imt,
+     data_scada_turbine dst  
+WHERE 1=1
+    AND it.wind_farm_id = dst.id_farm
+    AND it.turbine_id = dst.id_turbine
+    AND it.model = dst.no_model_turbine
+    AND it.model = imt.model
+    AND it.cut_in_wind_speed = imt.cut_in_wind_speed
+    AND it.cut_out_wind_speed = imt.cut_out_wind_speed
+    AND it.rotor_diameter = imt.rotor_diameter
+    AND it.hub_height = imt.hub_height 
+    AND it.wind_farm_id = icpt.wind_farm_id
+    AND it.model = icpt.standard_model
+    AND imt.no_model = %s
+    AND dst.wind_spd >= icpt.rated_wind_speed
+    AND dst.p_active >= imt.rated_capacity * 0.95 
+    AND dst.p_active <= imt.rated_capacity * 1.05
+    AND dst.rotor_spd IS NOT NULL 
+    AND dst.gen_spd IS NOT NULL
+    AND dst.rotor_spd > 0 
+    AND dst.gen_spd > 0
+ORDER BY dst.data_time;
+"""
+
+DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
+
+CHECK_TABLE_EXISTS_SQL = """
+SELECT COUNT(*) as table_exists 
+FROM information_schema.tables 
+WHERE table_schema = %s AND table_name = 'info_model_turbine'
+"""
+
+CHECK_TABLE_COLUMNS_SQL = """
+SELECT COLUMN_NAME 
+FROM INFORMATION_SCHEMA.COLUMNS 
+WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine'
+"""
+
 
 class ModelTurbineManager:
     """风机机型信息管理器"""
@@ -488,7 +574,227 @@ class ModelTurbineManager:
         )
         logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}")
     
-    # ... 其他方法保持不变 ...
+    def get_connection(self):
+        """从连接池获取连接"""
+        return self.connection_pool.get_connection()
+    
+    def release_connection(self, conn):
+        """释放连接回连接池"""
+        self.connection_pool.release_connection(conn)
+    
+    def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
+        """
+        执行查询语句
+        
+        Args:
+            sql: SQL查询语句
+            params: SQL参数
+            
+        Returns:
+            查询结果列表
+        """
+        conn = self.get_connection()
+        if not conn:
+            raise Exception("无法从连接池获取数据库连接")
+        
+        try:
+            with conn.cursor() as cursor:
+                cursor.execute(sql, params)
+                result = cursor.fetchall()
+                logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
+                return result
+        except Exception as e:
+            logger.error(f"查询执行失败: {e}")
+            logger.error(f"SQL: {sql}")
+            if params:
+                logger.error(f"参数: {params}")
+            raise
+        finally:
+            self.release_connection(conn)
+    
+    def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
+        """
+        执行更新语句(INSERT, UPDATE, DELETE)
+        
+        Args:
+            sql: SQL更新语句
+            params: SQL参数
+            
+        Returns:
+            影响的行数
+        """
+        conn = self.get_connection()
+        if not conn:
+            raise Exception("无法从连接池获取数据库连接")
+        
+        try:
+            with conn.cursor() as cursor:
+                affected_rows = cursor.execute(sql, params)
+                conn.commit()
+                logger.debug(f"更新执行成功,影响 {affected_rows} 行")
+                return affected_rows
+        except Exception as e:
+            conn.rollback()
+            logger.error(f"更新执行失败: {e}")
+            logger.error(f"SQL: {sql}")
+            if params:
+                logger.error(f"参数: {params}")
+            raise
+        finally:
+            self.release_connection(conn)
+    
+    def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int:
+        """
+        批量执行更新语句
+        
+        Args:
+            sql: SQL更新语句
+            params_list: SQL参数列表
+            
+        Returns:
+            影响的行数
+        """
+        conn = self.get_connection()
+        if not conn:
+            raise Exception("无法从连接池获取数据库连接")
+        
+        try:
+            with conn.cursor() as cursor:
+                affected_rows = cursor.executemany(sql, params_list)
+                conn.commit()
+                logger.debug(f"批量更新执行成功,影响 {affected_rows} 行")
+                return affected_rows
+        except Exception as e:
+            conn.rollback()
+            logger.error(f"批量更新执行失败: {e}")
+            logger.error(f"SQL: {sql}")
+            raise
+        finally:
+            self.release_connection(conn)
+    
+    def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
+        """
+        检查表是否存在
+        
+        Args:
+            table_name: 表名
+            
+        Returns:
+            bool: 表是否存在
+        """
+        try:
+            result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
+            return result[0]['table_exists'] > 0
+        except Exception as e:
+            logger.error(f"检查表存在性失败: {e}")
+            return False
+    
+    def check_table_columns(self) -> List[str]:
+        """
+        检查表的列
+        
+        Returns:
+            List[str]: 列名列表
+        """
+        try:
+            result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,))
+            return [row['COLUMN_NAME'] for row in result]
+        except Exception as e:
+            logger.error(f"检查表列失败: {e}")
+            return []
+    
+    def add_missing_columns(self):
+        """添加缺失的列"""
+        try:
+            logger.info("检查并添加缺失的列...")
+            self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL)
+            logger.info("表结构更新完成")
+        except Exception as e:
+            logger.error(f"更新表结构失败: {e}")
+    
+    def create_model_turbine_table(self) -> bool:
+        """
+        创建风机机型信息表
+        
+        Returns:
+            bool: 是否成功创建表
+        """
+        try:
+            logger.info("开始创建风机机型信息表...")
+            self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
+            logger.info("风机机型信息表创建成功")
+            return True
+        except Exception as e:
+            logger.error(f"创建风机机型信息表失败: {e}")
+            return False
+    
+    def drop_model_turbine_table(self) -> bool:
+        """
+        删除风机机型信息表
+        
+        Returns:
+            bool: 是否成功删除表
+        """
+        try:
+            logger.info("开始删除风机机型信息表...")
+            self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
+            logger.info("风机机型信息表删除成功")
+            return True
+        except Exception as e:
+            logger.error(f"删除风机机型信息表失败: {e}")
+            return False
+    
+    def get_model_data(self) -> List[Dict[str, Any]]:
+        """
+        从info_turbine表获取机型分组数据
+        
+        Returns:
+            机型数据列表
+        """
+        try:
+            logger.info("开始从info_turbine表查询机型数据...")
+            data = self.execute_query(SELECT_MODEL_DATA_SQL)
+            logger.info(f"成功查询到 {len(data)} 条机型记录")
+            return data
+        except Exception as e:
+            logger.error(f"查询机型数据失败: {e}")
+            return []
+    
+    def get_no_model_list(self) -> List[str]:
+        """
+        获取所有no_model列表
+        
+        Returns:
+            no_model列表
+        """
+        try:
+            logger.info("开始查询所有机型标识...")
+            result = self.execute_query(SELECT_NO_MODEL_LIST_SQL)
+            no_models = [row['no_model'] for row in result]
+            logger.info(f"成功获取 {len(no_models)} 个机型标识")
+            return no_models
+        except Exception as e:
+            logger.error(f"查询机型标识列表失败: {e}")
+            return []
+    
+    def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]:
+        """
+        获取指定机型的SCADA数据用于参数计算
+        
+        Args:
+            no_model: 机型标识
+            
+        Returns:
+            SCADA数据列表
+        """
+        try:
+            logger.debug(f"开始查询机型 {no_model} 的SCADA数据...")
+            data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,))
+            logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录")
+            return data
+        except Exception as e:
+            logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}")
+            return []
     
     def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]:
         """
@@ -636,15 +942,338 @@ class ModelTurbineManager:
                 "data_points": 0
             }
     
-    # ... 其他方法保持不变 ...
+    def calculate_and_update_all_parameters(self) -> Dict[str, Any]:
+        """
+        计算并更新所有机型的额定参数
+        
+        Returns:
+            计算统计信息
+        """
+        try:
+            logger.info("开始计算并更新所有机型的额定参数...")
+            
+            # 获取所有机型标识
+            no_model_list = self.get_no_model_list()
+            
+            if not no_model_list:
+                logger.warning("没有找到任何机型标识")
+                return {
+                    "total_models": 0,
+                    "success_count": 0,
+                    "failed_count": 0,
+                    "results": []
+                }
+            
+            logger.info(f"共发现 {len(no_model_list)} 个机型需要计算")
+            
+            results = []
+            success_count = 0
+            failed_count = 0
+            
+            # 遍历每个机型进行计算
+            for i, no_model in enumerate(no_model_list, 1):
+                logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}")
+                
+                result = self.calculate_and_update_parameters_for_model(no_model)
+                results.append(result)
+                
+                if result.get("success"):
+                    success_count += 1
+                else:
+                    failed_count += 1
+            
+            # 汇总统计
+            stats = {
+                "total_models": len(no_model_list),
+                "success_count": success_count,
+                "failed_count": failed_count,
+                "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0,
+                "results": results
+            }
+            
+            logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, "
+                       f"成功率={stats['success_rate']:.1f}%")
+            
+            return stats
+            
+        except Exception as e:
+            logger.error(f"计算并更新所有参数失败: {e}")
+            raise
+    
+    def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
+        """
+        将机型数据插入到info_model_turbine表
+        
+        Args:
+            model_data: 机型数据列表
+            
+        Returns:
+            int: 成功插入的记录数
+        """
+        if not model_data:
+            logger.warning("没有数据需要插入")
+            return 0
+        
+        insert_sql = """
+        INSERT INTO info_model_turbine 
+        (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed, 
+         cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+        ON DUPLICATE KEY UPDATE
+        turbine_count = VALUES(turbine_count),
+        updated_at = CURRENT_TIMESTAMP
+        """
+        
+        try:
+            logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
+            
+            # 准备插入数据
+            insert_data = []
+            for item in model_data:
+                insert_data.append((
+                    item['no_model'],
+                    item['model'],
+                    item['manufacturer'],
+                    item['rated_capacity'],
+                    item['cut_in_wind_speed'],
+                    item['cut_out_wind_speed'],
+                    item['rotor_diameter'],
+                    item['hub_height'],
+                    item.get('turbine_count', 1)
+                ))
+            
+            # 批量插入数据
+            affected_rows = self.execute_batch_update(insert_sql, insert_data)
+            
+            logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
+            return affected_rows
+            
+        except Exception as e:
+            logger.error(f"插入机型数据失败: {e}")
+            raise
+    
+    def get_model_turbine_stats(self) -> Dict[str, Any]:
+        """
+        获取info_model_turbine表的统计信息
+        
+        Returns:
+            统计信息字典
+        """
+        try:
+            # 查询统计信息
+            stats_sql = """
+            SELECT 
+                COUNT(*) as total_models,
+                COUNT(DISTINCT manufacturer) as manufacturer_count,
+                COUNT(DISTINCT model) as model_count,
+                MIN(rated_capacity) as min_capacity,
+                MAX(rated_capacity) as max_capacity,
+                AVG(rated_capacity) as avg_capacity,
+                SUM(turbine_count) as total_turbines,
+                COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models,
+                COUNT(DISTINCT turbine_type) as turbine_type_count,
+                SUM(data_points) as total_data_points,
+                COUNT(CASE WHEN calculation_status = 'success' THEN 1 END) as success_count,
+                COUNT(CASE WHEN calculation_status = 'adjusted' THEN 1 END) as adjusted_count,
+                COUNT(CASE WHEN calculation_status = 'error' THEN 1 END) as error_count
+            FROM info_model_turbine
+            """
+            
+            result = self.execute_query(stats_sql)
+            return result[0] if result else {}
+            
+        except Exception as e:
+            logger.error(f"获取统计信息失败: {e}")
+            return {}
+    
+    def get_turbine_type_distribution(self) -> List[Dict[str, Any]]:
+        """
+        获取风机类型分布
+        
+        Returns:
+            类型分布列表
+        """
+        try:
+            type_sql = """
+            SELECT 
+                IFNULL(turbine_type, '未知') as turbine_type,
+                COUNT(*) as model_count,
+                SUM(turbine_count) as turbine_count,
+                AVG(transmission_ratio) as avg_ratio,
+                AVG(rated_rotor_spd) as avg_rotor_spd,
+                AVG(rated_gen_spd) as avg_gen_spd
+            FROM info_model_turbine
+            WHERE turbine_type IS NOT NULL
+            GROUP BY turbine_type
+            ORDER BY model_count DESC
+            """
+            
+            return self.execute_query(type_sql)
+            
+        except Exception as e:
+            logger.error(f"获取风机类型分布失败: {e}")
+            return []
+    
+    def print_model_summary(self, model_data: List[Dict[str, Any]]):
+        """
+        打印机型数据摘要
+        
+        Args:
+            model_data: 机型数据列表
+        """
+        if not model_data:
+            logger.info("没有机型数据")
+            return
+        
+        print("\n" + "="*80)
+        print("风机机型数据摘要")
+        print("="*80)
+        print(f"总机型数: {len(model_data)}")
+        
+        # 按制造商统计
+        manufacturers = {}
+        for item in model_data:
+            manufacturer = item.get('manufacturer', '未知')
+            manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
+        
+        print(f"制造商数: {len(manufacturers)}")
+        print("\n制造商分布:")
+        for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
+            print(f"  {manufacturer}: {count} 种机型")
+        
+        # 按额定容量统计
+        capacities = {}
+        for item in model_data:
+            capacity = item.get('rated_capacity', 0)
+            if capacity:
+                capacity_range = f"{capacity}kW"
+                capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
+        
+        print("\n额定容量分布:")
+        for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
+            print(f"  {capacity}: {count} 种机型")
+        
+        print("="*80)
+    
+    def print_calculation_summary(self, stats: Dict[str, Any]):
+        """打印参数计算摘要"""
+        print("\n" + "="*80)
+        print("参数计算统计")
+        print("="*80)
+        print(f"总机型数: {stats.get('total_models', 0)}")
+        print(f"成功计算的机型数: {stats.get('success_count', 0)}")
+        print(f"失败的机型数: {stats.get('failed_count', 0)}")
+        print(f"成功率: {stats.get('success_rate', 0):.1f}%")
+        
+        # 显示成功计算的部分结果
+        success_results = [r for r in stats.get('results', []) if r.get('success')]
+        if success_results:
+            print(f"\n成功计算的机型示例 (前5个):")
+            for i, result in enumerate(success_results[:5]):
+                print(f"  {i+1}. {result['no_model']}:")
+                print(f"     叶轮转速: {result['rated_rotor_spd']:.1f} rpm")
+                print(f"     发电机转速: {result['rated_gen_spd']} rpm")
+                print(f"     传动比: {result['transmission_ratio']:.3f}")
+                print(f"     类型: {result['turbine_type']}")
+                print(f"     数据点数: {result['data_points']}")
+                if result.get('validation_info'):
+                    print(f"     校验信息: {result['validation_info']}")
+        
+        # 显示失败的部分原因
+        failed_results = [r for r in stats.get('results', []) if not r.get('success')]
+        if failed_results:
+            print(f"\n失败的机型示例 (前5个):")
+            for i, result in enumerate(failed_results[:5]):
+                print(f"  {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}")
+        
+        print("="*80)
+    
+    def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool:
+        """
+        运行完整的机型数据提取和参数计算流程
+        
+        Args:
+            recreate_table: 是否重新创建表
+            calculate_params: 是否计算额定参数
+            
+        Returns:
+            bool: 整个流程是否成功
+        """
+        try:
+            logger.info("开始执行风机机型数据提取和参数计算流程...")
+            
+            # 步骤1: 检查或创建表
+            if recreate_table:
+                self.drop_model_turbine_table()
+                self.create_model_turbine_table()
+            else:
+                if not self.check_table_exists():
+                    self.create_model_turbine_table()
+                else:
+                    logger.info("info_model_turbine表已存在,将追加数据")
+                    # 检查并添加缺失的列
+                    self.add_missing_columns()
+            
+            # 步骤2: 从info_turbine表获取机型数据
+            model_data = self.get_model_data()
+            if not model_data:
+                logger.error("未获取到机型数据,流程终止")
+                return False
+            
+            # 步骤3: 打印摘要信息
+            self.print_model_summary(model_data)
+            
+            # 步骤4: 插入数据到info_model_turbine表
+            inserted_count = self.insert_model_data(model_data)
+            
+            # 步骤5: 计算额定参数
+            if calculate_params:
+                calculation_stats = self.calculate_and_update_all_parameters()
+                self.print_calculation_summary(calculation_stats)
+            
+            # 步骤6: 获取并显示最终统计信息
+            stats = self.get_model_turbine_stats()
+            if stats:
+                print("\n数据库统计信息:")
+                print(f"  总机型数: {stats.get('total_models', 0)}")
+                print(f"  制造商数: {stats.get('manufacturer_count', 0)}")
+                print(f"  总风机数: {stats.get('total_turbines', 0)}")
+                print(f"  额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
+                print(f"  平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
+                print(f"  已计算参数的机型数: {stats.get('calculated_models', 0)}")
+                print(f"  成功计算: {stats.get('success_count', 0)}, 调整后计算: {stats.get('adjusted_count', 0)}, 错误: {stats.get('error_count', 0)}")
+                print(f"  使用的总数据点数: {stats.get('total_data_points', 0)}")
+            
+            # 显示风机类型分布
+            type_dist = self.get_turbine_type_distribution()
+            if type_dist:
+                print("\n风机类型分布:")
+                for item in type_dist:
+                    print(f"  {item['turbine_type']}: {item['model_count']} 种机型, "
+                          f"{item['turbine_count']} 台风机")
+                    if item['avg_ratio']:
+                        print(f"    平均传动比: {item['avg_ratio']:.3f}, "
+                              f"平均叶轮转速: {item['avg_rotor_spd']:.1f} rpm, "
+                              f"平均发电机转速: {item['avg_gen_spd']:.0f} rpm")
+            
+            logger.info("风机机型数据提取和参数计算流程执行完成!")
+            return True
+            
+        except Exception as e:
+            logger.error(f"流程执行失败: {e}")
+            import traceback
+            traceback.print_exc()
+            return False
+        finally:
+            # 关闭连接池
+            if self.connection_pool:
+                self.connection_pool.close_all()
 
 
 def main():
     """主函数"""
     # 数据库配置
     db_config = DatabaseConfig(
-        # host="106.120.102.238",
-        # port=44000,
         host="192.168.50.234",
         port=4000,
         user='root',