wangjiaojiao 5 месяцев назад
Сommit
c7b05d529f

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+__pycache__/
+*.pyc

+ 329 - 0
api_health.py

@@ -0,0 +1,329 @@
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+from datetime import datetime
+from typing import List, Dict, Optional
+import logging
+import os
+import glob
+import joblib
+import math
+import numpy as np
+import pandas as pd
+app = FastAPI(root_path="/api/health")
+import time
+from health_pretrain import WindFarmPretrainModel
+from database import DataFetcher
+from health_evalution_class import HealthAssessor
+import pandas as pd
+import concurrent.futures
+
+# 配置日志
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+# 请求模型定义
+class AssessmentRequest(BaseModel):
+    windcode: str
+    month: str  # 格式: "YYYY-MM"
+
+class SubsystemResult(BaseModel):
+    health_score: float
+    weights: Dict[str, float]
+    message: Optional[str] = None
+
+class AssessmentResult(BaseModel):
+    engine_code: str
+    wind_turbine_name: str
+    mill_type: str
+    total_health_score: Optional[float]
+    subsystems: Dict[str, SubsystemResult]
+    assessed_subsystems: List[str]
+    model_type: Optional[str] = None  # 新增字段,不影响原有结构
+
+# 全局存储
+PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {}  # 改为存储风场级模型
+REALTIME_ASSESSOR = HealthAssessor()
+
+@app.on_event("startup")
+def load_resources():
+    """加载预训练模型和其他资源"""
+    # 加载预训练模型
+    model_root = "health_models"
+    if os.path.exists(model_root):
+        for wind_code in os.listdir(model_root):
+            wind_dir = os.path.join(model_root, wind_code)
+            if os.path.isdir(wind_dir):
+                try:
+                    model = WindFarmPretrainModel.load(wind_dir, wind_code)
+                    if model:
+                        PRETRAIN_MODELS[wind_code] = model
+                except Exception as e:
+                    logger.error(f"加载风场模型失败 {wind_code}: {str(e)}")
+    
+    logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型")
+
+def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult:
+    """格式化评估结果(完全兼容原有逻辑)"""
+    # 确保所有子系统都存在,即使评分为-1
+    subsystems = assessment.get('subsystems', {})
+    for subsys in ['generator', 'nacelle', 'grid', 'drive_train']:
+        if subsys not in subsystems:
+            subsystems[subsys] = {
+                'health_score': -1,
+                'weights': {},
+                'message': None
+            }
+    
+    return AssessmentResult(
+        engine_code=assessment['engine_code'],
+        wind_turbine_name=assessment['wind_turbine_name'],
+        mill_type=assessment['mill_type'],
+        total_health_score=assessment.get('total_health_score', -1),
+        subsystems={
+            k: SubsystemResult(
+                health_score=v['health_score'],
+                weights=v.get('weights', {}),
+                message=v.get('message')
+            ) for k, v in subsystems.items()
+        },
+        assessed_subsystems=assessment.get('assessed_subsystems', []),
+        model_type=model_type
+    )
+
+def clean_nans(obj):
+    """递归清理NaN值(保留原有逻辑)"""
+    if isinstance(obj, dict):
+        return {k: clean_nans(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [clean_nans(item) for item in obj]
+    elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)):
+        return -1.0
+    else:
+        return obj
+
+def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict:
+    """创建空评估结果(完全保留原有逻辑)"""
+    return {
+        'engine_code': engine_code,
+        'wind_turbine_name': wind_turbine_name,
+        'mill_type': mill_type,
+        'total_health_score': -1,
+        'subsystems': {
+            'generator': {'health_score': -1, 'weights': {}, 'message': None},
+            'nacelle': {'health_score': -1, 'weights': {}, 'message': None},
+            'grid': {'health_score': -1, 'weights': {}, 'message': None},
+            'drive_train': {'health_score': -1, 'weights': {}, 'message': None}
+        },
+        'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
+    }
+
+def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name):
+    """实时评估逻辑(添加耗时统计)"""
+    start_time = time.time()
+    
+    # 获取可用特征列
+    col_start = time.time()
+    available_columns = fetcher.get_turbine_columns(windcode)
+    logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s")
+    
+    if not available_columns:
+        return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
+        
+    # 获取有效特征
+    feat_start = time.time()
+    valid_features = REALTIME_ASSESSOR._get_all_possible_features(
+        REALTIME_ASSESSOR, mill_type, available_columns
+    )
+    logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s")
+    
+    # 获取数据
+    data_start = time.time()
+    data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features)
+    logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
+    
+    if data is None or data.empty:
+        return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
+    
+    # 执行评估
+    assess_start = time.time()
+    assessment = REALTIME_ASSESSOR.assess_turbine(
+        engine_code, data, mill_type, wind_turbine_name
+    )
+    logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s")
+    
+    logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s")
+    return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type))
+
+def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type):
+    """使用预训练模型评估"""
+    start_time = time.time()    
+    model = PRETRAIN_MODELS.get(windcode)
+    if not model:
+        logger.warning(f"未找到风场 {windcode} 的预训练模型")
+        return None
+    
+    # 检查机型是否匹配
+    if model.mill_type != mill_type:
+        logger.warning(
+            f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. "
+            f"风机 {engine_code} 将使用实时计算"
+        )
+        return None
+    feat_start = time.time()   
+    # 获取模型特征(确保不重复且存在)
+    features = list({
+        f for subsys_feats in model.features.values() 
+        for f in subsys_feats 
+        if f in fetcher.get_turbine_columns(windcode)  # 确保特征在数据库中存在
+    })
+    logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s")
+    
+    if not features:
+        logger.warning(f"风场模型 {windcode} 无可用特征")
+        return None
+    
+    # 获取数据
+    data_start = time.time()
+    data = fetcher.fetch_turbine_data(windcode, engine_code, month, features)
+    logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
+    if data is None or data.empty:
+        logger.warning(f"风机 {engine_code} 无有效数据")
+        return None
+    
+    # 执行评估
+    assess_start = time.time()
+    try:
+        assessment = model.assess(data, engine_code)
+        logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s")
+        
+        if not assessment:
+            logger.warning(f"风场模型评估失败 {engine_code}")
+            return None
+        
+        assessment.update({
+            'engine_code': engine_code,
+            'wind_turbine_name': wind_turbine_name,
+            'mill_type': mill_type
+        })
+        logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s")
+        return clean_nans(assessment)
+    except Exception as e:
+        logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True)
+        return None
+
+@app.post("/health_assess", response_model=List[AssessmentResult])
+async def assess_windfarm_optimized(request: AssessmentRequest):
+    """优化后的评估流程(添加耗时统计)"""
+    total_start = time.time()
+    logger.info(f"开始评估风场 {request.windcode} {request.month}")
+    
+    try:
+        fetcher_start = time.time()
+        fetcher = DataFetcher()
+        logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s")
+        
+        # 1. 获取风场所有风机信息
+        turbine_start = time.time()
+        turbines = fetcher.get_turbines(request.windcode)
+        logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s")
+        
+        if turbines.empty:
+            return []
+        
+        # 2. 检查预训练模型
+        model_check_start = time.time()
+        model = PRETRAIN_MODELS.get(request.windcode)
+        logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s")
+        
+        if not model:
+            logger.info("未使用预训练模型,采用实时评估")
+            return [assess_with_realtime(fetcher, request.windcode, row['engine_code'], 
+                                        request.month, 
+                                        get_mill_type(fetcher, row['mill_type_code']),
+                                        row['engine_name']) 
+                   for _, row in turbines.iterrows()]
+        
+        # 3. 批量获取所有风机数据
+        data_fetch_start = time.time()
+        all_features = list({f for feats in model.features.values() for f in feats})
+        all_data = fetcher.fetch_all_turbines_data(
+            request.windcode, 
+            request.month,
+            all_features
+        )
+        logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s")
+        logger.info(f"获取到 {len(all_data)} 台风机数据")
+        
+        # 4. 并行评估
+        assess_start = time.time()
+        results = []
+        with concurrent.futures.ThreadPoolExecutor() as executor:
+            futures = []
+            for _, row in turbines.iterrows():
+                engine_code = row['engine_code']
+                if engine_code not in all_data:
+                    empty_assessment = create_empty_assessment(
+                        engine_code,
+                        row['engine_name'],
+                        get_mill_type(fetcher, row['mill_type_code'])
+                    )
+                    futures.append(executor.submit(
+                        lambda: (_format_result(empty_assessment))
+                    ))
+                    continue
+                    
+                futures.append(executor.submit(
+                    _assess_single_turbine,
+                    model, all_data[engine_code], row, fetcher
+                ))
+            
+            for future in concurrent.futures.as_completed(futures):
+                try:
+                    result = future.result()
+                    results.append(result)
+                except Exception as e:
+                    logger.error(f"评估失败: {str(e)}")
+                    continue
+        
+        logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s")
+        logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s")
+        return results
+    except Exception as e:
+        logger.error(f"评估流程异常: {str(e)}")
+        raise HTTPException(status_code=500, detail="内部服务器错误")
+
+
+def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame, 
+                          turbine_info: dict, fetcher: DataFetcher):
+    """单个风机评估(添加耗时统计)"""
+    start_time = time.time()
+    try:
+        # 获取机型
+        mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
+        mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+        
+        # 评估
+        assess_start = time.time()
+        assessment = model.assess(data, turbine_info['engine_code'])
+        logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s")
+        
+        assessment.update({
+            'engine_code': turbine_info['engine_code'],
+            'wind_turbine_name': turbine_info['engine_name'],
+            'mill_type': mill_type
+        })
+        logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s")
+        return _format_result(clean_nans(assessment), "pretrained")
+    except Exception as e:
+        logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}")
+        empty_assessment = create_empty_assessment(
+            turbine_info['engine_code'],
+            turbine_info['engine_name'],
+            mill_type
+        )
+        return _format_result(empty_assessment)
+
+def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str:
+    """获取机型字符串"""
+    mill_type_num = fetcher.get_mill_type(mill_type_code)
+    return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')

+ 157 - 0
database.py

@@ -0,0 +1,157 @@
+import pandas as pd
+from sqlalchemy import create_engine, inspect
+import traceback
+import logging
+from functools import lru_cache
+from typing import List, Dict, Optional
+class DataFetcher:
+    def __init__(self):
+        self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
+        self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
+        # self.show_engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
+        # self.data_engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')           
+    @lru_cache(maxsize=10)
+    def get_turbine_columns_cached(self, windcode: str):
+        return self.get_turbine_columns(windcode)
+    def get_turbine_columns(self, windcode):
+        """
+        获取指定风场数据表的所有列名
+        :param windcode: 风场编号 (如 "WF001")
+        :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
+        """
+        # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号        
+        special_wind_farms = {
+            #训练诺木洪风场时 删掉反引号
+            "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+           # "WOF093400005": f"{windcode}-WOB000001_minute"  
+        }
+        table_name = special_wind_farms.get(windcode, f"{windcode}_minute")        
+
+        try:
+            inspector = inspect(self.data_engine)
+            print(f"正在查询表: {table_name}")  # 调试用            
+            columns = inspector.get_columns(table_name)
+            return [col['name'] for col in columns]
+
+        except Exception as e:
+            print(f"Error fetching columns for {table_name}: {str(e)}")
+            return []
+
+    """
+    获取风场下所有风机信息
+    根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
+    """
+    def get_turbines(self, windcode):
+        query = f"""
+        SELECT engine_code, mill_type_code,engine_name 
+        FROM wind_engine_group 
+        WHERE field_code = '{windcode}'
+        """
+        return pd.read_sql(query, self.show_engine)
+
+    """
+    获取机型驱动类型
+    根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
+    """
+    def get_mill_type(self, mill_type_code):
+        query = f"""
+        SELECT curved_motion_type 
+        FROM wind_engine_mill 
+        WHERE mill_type_code = '{mill_type_code}'
+        """
+        result = pd.read_sql(query, self.show_engine)
+        return result.iloc[0, 0] if not result.empty else None
+
+    """
+    获取风机时序数据
+    根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
+    """
+    def fetch_turbine_data(self, windcode, engine_code, month, features):
+        """获取指定月份风机数据(安全参数化版本)
+        
+        Args:
+            windcode: 风场编号 (如 "WF001")
+            engine_code: 风机编号 (如 "WT001")
+            month: 月份字符串 (格式 "YYYY-MM")
+            features: 需要查询的字段列表
+        Returns:
+            pd.DataFrame: 包含查询结果的DataFrame
+        """
+        try:
+            # 将month格式从yyyy-mm转换为单独的年份和月份
+            year, month = month.split('-')
+            
+            # 2. 验证特征列名安全性
+            safe_features = []
+            for feat in features:
+                if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
+                    safe_features.append(f'`{feat}`')  # 用反引号包裹
+                else:
+                    print(f"警告:忽略非法特征名 '{feat}'")
+            
+            if not safe_features:
+                print("错误:无有效特征列")
+                return pd.DataFrame()
+            # 3. 构建参数化查询
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号        
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"                 
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")  
+
+            query = f"""
+            SELECT `year`,`mont`, {','.join(safe_features)} 
+            FROM {table_name} 
+            WHERE `wind_turbine_number` = %s
+            AND `year` = %s 
+            AND `month` = %s
+            """
+            
+            print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
+            
+            # 4. 执行参数化查询
+            return pd.read_sql(query, self.data_engine, 
+                            params=(engine_code, year,month))
+        
+        except ValueError as e:
+            print(f"输入参数错误: {str(e)}")
+            return pd.DataFrame()
+        except Exception as e:
+            print(f"数据库查询失败: {str(e)}")
+            import traceback
+            traceback.print_exc()
+            return pd.DataFrame()
+
+
+    def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
+        """批量获取风场下所有风机数据"""
+        try:
+            # 安全特征检查
+            safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
+            if not safe_features:
+                return {}
+            
+            # 将month格式从yyyy-mm转换为单独的年份和月份
+            year, month = month.split('-')
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"                  
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
+            # 单次查询获取所有风机数据
+            query = f"""
+            SELECT `wind_turbine_number`, {','.join(safe_features)} 
+            FROM {table_name}
+            WHERE `year` = %s AND `month` = %s
+            """
+            # 执行查询
+            all_data = pd.read_sql(query, self.data_engine, params=(year, month))
+            # 按风机分组
+            return {turbine: group.drop(columns=['wind_turbine_number']) 
+                    for turbine, group in all_data.groupby('wind_turbine_number')}
+                    
+        except Exception as e:
+            print(f"批量查询失败: {str(e)}")
+            return {}   

+ 410 - 0
health_evalution_class.py

@@ -0,0 +1,410 @@
+import numpy as np
+import pandas as pd
+from sklearn.neighbors import BallTree
+from typing import Dict, List, Optional, Union
+from functools import lru_cache
+
+class HealthAssessor:
+    def __init__(self):
+        self.subsystem_config = {
+            # 发电机
+            'generator': {
+                # 双馈
+                'dfig': {
+                    'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
+                             'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
+                             'generatornon_drive_end_bearing_temperature'],
+                },
+                # 直驱
+                'direct': {
+                    'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
+                              'generator_winding3_temperature', 'main_bearing_temperature'],
+                }
+            },
+            # 机舱系统
+            'nacelle': {
+                'fixed': ['front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin',
+                          'cabin_position', 'cabin_temperature'],
+            },
+            # 电网环境
+            'grid': {
+                'fixed': ['reactive_power', 'active_power', 'grid_a_phase_current',
+                          'grid_b_phase_current', 'grid_c_phase_current'],
+            },
+            # 传动系统
+            'drive_train': {
+                'fixed': ['main_bearing_temperature'],
+                'keywords': [
+                    {'include': ['gearbox', 'temperature'], 'exclude': [], 'min_count': 2},
+                ]
+            }
+        }
+
+        # 嵌入源代码的MSET实现
+        self.mset = self._create_mset_core()
+
+    def _create_mset_core(self):
+        """创建MSET核心计算模块"""
+        class MSETCore:
+            def __init__(self):
+                self.matrixD = None
+                self.normalDataBallTree = None
+                self.healthyResidual = None
+
+            def calcSimilarity(self, x, y):
+                """优化后的相似度计算"""
+                diff = np.array(x) - np.array(y)
+                return 1 / (1 + np.sqrt(np.sum(diff ** 2)))
+
+            def genDLMatrix(self, trainDataset, dataSize4D=15, dataSize4L=5):
+                """优化矩阵生成过程"""
+                m, n = trainDataset.shape
+
+                # 快速选择极值点
+                min_indices = np.argmin(trainDataset, axis=0)
+                max_indices = np.argmax(trainDataset, axis=0)
+                unique_indices = np.unique(np.concatenate([min_indices, max_indices]))
+                self.matrixD = trainDataset[unique_indices].copy()
+
+                # 快速填充剩余点
+                remaining_indices = np.setdiff1d(np.arange(m), unique_indices)
+                np.random.shuffle(remaining_indices)
+                needed = max(0, dataSize4D - len(unique_indices))
+                if needed > 0:
+                    self.matrixD = np.vstack([self.matrixD, trainDataset[remaining_indices[:needed]]])
+
+                # 使用与源代码一致的BallTree参数
+                self.normalDataBallTree = BallTree(
+                    self.matrixD,
+                    leaf_size=40,
+                    metric=lambda i, j: 1 - self.calcSimilarity(i, j)  # 自定义相似度
+                )
+
+                # 使用所有数据计算残差
+                self.healthyResidual = self.calcResidualByLocallyWeightedLR(trainDataset)
+                return 0
+
+            def calcResidualByLocallyWeightedLR(self, newStates):
+                """优化残差计算"""
+                if len(newStates.shape) == 1:
+                    newStates = newStates.reshape(-1, 1)
+
+                dist, iList = self.normalDataBallTree.query(
+                    newStates,
+                    k=min(10, len(self.matrixD)),
+                    return_distance=True
+                )
+                weights = 1 / (dist + 1e-5)
+                weights /= weights.sum(axis=1)[:, np.newaxis]
+
+                est_X = np.sum(weights[:, :, np.newaxis] * self.matrixD[iList[0]], axis=1)
+                return est_X - newStates
+
+            def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1):
+                """优化SPRT计算"""
+                stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
+                weightedStateResidual = np.dot(stateResidual, feature_weight)
+                weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
+
+                mu0 = np.mean(weightedHealthyResidual)
+                sigma0 = np.std(weightedHealthyResidual)
+
+                # 向量化计算
+                n = len(newsStates)
+                if n < decisionGroup:
+                    return [50]  # 中性值
+
+                rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup) / decisionGroup, 'valid')
+                si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2 * mu0) / (2 * sigma0 ** 2)
+
+                lowThres = np.log(beta / (1 - alpha))
+                highThres = np.log((1 - beta) / alpha)
+
+                si = np.clip(si, lowThres, highThres)
+                si = np.where(si > 0, si / highThres, si / lowThres)
+                flag = 100 - si * 100
+
+                # 填充不足的部分
+                if len(flag) < n:
+                    flag = np.pad(flag, (0, n - len(flag)), mode='edge')
+
+                return flag.tolist()
+
+            def CRITIC_prepare(self, data, flag=1):
+                """标准化处理"""
+                data = data.astype(float)
+                numeric_cols = data.select_dtypes(include=[np.number]).columns
+                negative_cols = [col for col in numeric_cols
+                                 if any(kw in col for kw in ['temperature'])]
+                positive_cols = list(set(numeric_cols) - set(negative_cols))
+
+                # 负向标准化
+                if negative_cols:
+                    max_val = data[negative_cols].max()
+                    min_val = data[negative_cols].min()
+                    data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5)
+
+                # 正向标准化
+                if positive_cols:
+                    max_val = data[positive_cols].max()
+                    min_val = data[positive_cols].min()
+                    data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5)
+
+                return data
+
+            def CRITIC(self, data):
+                """CRITIC权重计算(支持单特征)"""
+                try:
+                    # 处理单特征情况
+                    if len(data.columns) == 1:
+                        return pd.Series([1.0], index=data.columns)
+                        
+                    data_norm = self.CRITIC_prepare(data.copy())
+                    std = data_norm.std(ddof=0).clip(lower=0.01)
+                    
+                    # 计算相关系数矩阵(添加异常处理)
+                    try:
+                        corr = np.abs(np.corrcoef(data_norm.T))
+                        np.fill_diagonal(corr, 0)
+                        conflict = np.sum(1 - corr, axis=1)
+                    except:
+                        # 如果计算相关系数失败,使用等权重
+                        return pd.Series(np.ones(len(data.columns))/len(data.columns))
+                        
+                    info = std * conflict
+                    weights = info / info.sum()
+                    return pd.Series(weights, index=data.columns)
+                except Exception as e:
+                    print(f"CRITIC计算失败: {str(e)}")
+                    return pd.Series(np.ones(len(data.columns))/len(data.columns))
+
+            def ahp(self, matrix):
+                """AHP权重计算"""
+                eigenvalue, eigenvector = np.linalg.eig(matrix)
+                max_idx = np.argmax(eigenvalue)
+                weight = eigenvector[:, max_idx].real
+                return weight / weight.sum(), eigenvalue[max_idx].real
+
+        return MSETCore()
+
+    def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name):
+        """评估单个风机"""
+        results = {
+            "engine_code": engine_code,
+            "wind_turbine_name": wind_turbine_name,
+            "mill_type": mill_type,
+            "total_health_score": None,
+            "subsystems": {},
+            "assessed_subsystems": []
+        }
+
+        # 各子系统评估
+        subsystems_to_assess = [
+            ('generator', self.subsystem_config['generator'][mill_type], 1),
+            ('nacelle', self.subsystem_config['nacelle'], 1),
+            ('grid', self.subsystem_config['grid'], 1),
+            ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None, 1)
+        ]
+
+        for subsystem, config, min_features in subsystems_to_assess:
+            if config is None:
+                continue
+
+            features = self._get_subsystem_features(config, data)
+
+            # 功能1:无论特征数量是否足够都输出结果
+            if len(features) >= min_features:
+                assessment = self._assess_subsystem(data[features])
+            else:
+                assessment = {
+                    'health_score': -1,  # 特征不足时输出'-'
+                    'weights': {},
+                    'message': f'Insufficient features (required {min_features}, got {len(features)})'
+                }
+            print('结果打印',assessment)
+            # 功能3:删除features内容
+            if 'features' in assessment:
+                del assessment['features']
+
+            # 最终清理:确保没有NaN值
+            for sys, result in results["subsystems"].items():
+                if isinstance(result['health_score'], float) and np.isnan(result['health_score']):
+                    result['health_score'] = -1
+                    result['message'] = (result.get('message') or '') + '; NaN detected'
+            
+            if isinstance(results["total_health_score"], float) and np.isnan(results["total_health_score"]):
+                results["total_health_score"] = -1               
+            results["subsystems"][subsystem] = assessment
+
+        # 计算整机健康度(使用新字段名)
+        if results["subsystems"]:
+            # 只计算健康值为数字的子系统
+            valid_subsystems = [
+                k for k, v in results["subsystems"].items()
+                if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
+            ]
+
+            if valid_subsystems:
+                weights = self._get_subsystem_weights(valid_subsystems)
+                health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
+                results["total_health_score"] = float(np.dot(health_scores, weights))
+                results["assessed_subsystems"] = valid_subsystems
+
+        return results
+
+    def _get_all_possible_features(self, mill_type, available_columns):
+        """
+        获取所有可能的特征列(基于实际存在的列)
+        
+        参数:
+            mill_type: 机型类型
+            available_columns: 数据库实际存在的列名列表
+        """
+        features = []
+        available_columns_lower = [col.lower() for col in available_columns]  # 不区分大小写匹配
+        # for subsys_name, subsys_config in assessor.subsystem_config.items():
+        for subsys_name, subsys_config in self.subsystem_config.items():
+            # 处理子系统配置
+            if subsys_name == 'generator':
+                config = subsys_config.get(mill_type, {})
+            elif subsys_name == 'drive_train' and mill_type != 'dfig':
+                continue
+            else:
+                config = subsys_config
+
+            # 处理固定特征
+            if 'fixed' in config:
+                for f in config['fixed']:
+                    if f in available_columns:
+                        features.append(f)
+
+            # 处理关键词特征
+            if 'keywords' in config:
+                for rule in config['keywords']:
+                    matched = []
+                    include_kws = [kw.lower() for kw in rule['include']]
+                    exclude_kws = [ex.lower() for ex in rule.get('exclude', [])]
+
+                    for col in available_columns:
+                        col_lower = col.lower()
+                        # 检查包含关键词
+                        include_ok = all(kw in col_lower for kw in include_kws)
+                        # 检查排除关键词
+                        exclude_ok = not any(ex in col_lower for ex in exclude_kws)
+
+                        if include_ok and exclude_ok:
+                            matched.append(col)
+
+                    if len(matched) >= rule.get('min_count', 1):
+                        features.extend(matched)
+
+        return list(set(features))  # 去重
+
+    def _get_subsystem_features(self, config: Dict, data: pd.DataFrame) -> List[str]:
+        """最终版特征获取方法"""
+        available_features = []
+
+        # 固定特征检查(要求至少10%非空)
+        if 'fixed' in config:
+            for f in config['fixed']:
+                if f in data.columns and data[f].notna().mean() > 0.1:
+                    available_features.append(f)
+        print(f"匹配到的固定特征: {available_features}")
+        # 关键词特征检查
+        if 'keywords' in config:
+            for rule in config['keywords']:
+                matched = [
+                    col for col in data.columns
+                    if all(kw.lower() in col.lower() for kw in rule['include'])
+                    and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
+                    and data[col].notna().mean() > 0.1  # 数据有效性检查
+                ]
+                if len(matched) >= rule.get('min_count', 1):
+                    available_features.extend(matched)
+        print(f"匹配到的关键词特征: {available_features}")
+        return list(set(available_features))
+    
+    def _assess_subsystem(self, data: pd.DataFrame) -> Dict:
+        """评估子系统(支持单特征)"""
+        # 数据清洗
+        clean_data = data.dropna()
+        if len(clean_data) < 10:  # 降低最小样本量要求(原为20)
+            return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'}
+
+        try:
+            # 标准化
+            normalized_data = self.mset.CRITIC_prepare(clean_data)
+
+            # 计算权重 - 处理单特征情况
+            if len(normalized_data.columns) == 1:
+                weights = pd.Series([1.0], index=normalized_data.columns)
+            else:
+                weights = self.mset.CRITIC(normalized_data)
+
+            # MSET评估
+            health_score = self._run_mset_assessment(normalized_data.values, weights.values)
+
+            bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
+            adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
+
+            def adjust_score(score):
+                for i in range(len(bins)):
+                    if score < bins[i]:
+                        return score + adjust_values[i-1]
+                return score  #
+
+            adjusted_score = adjust_score(health_score)  #
+            if adjusted_score >= 100:
+                adjusted_score = 92.8
+
+            return {
+                'health_score': float(health_score),
+                'weights': weights.to_dict(),
+                'features': list(data.columns)
+            }
+        except Exception as e:
+            return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)}
+        
+    @lru_cache(maxsize=10)
+    def _get_mset_model(self, train_data: tuple):
+        """缓存MSET模型"""
+        # 注意:由于lru_cache需要可哈希参数,这里使用元组
+        arr = np.array(train_data)
+        model = self._create_mset_core()
+        model.genDLMatrix(arr)
+        return model
+
+    def _run_mset_assessment(self, data: np.ndarray, weights: np.ndarray) -> float:
+        """执行MSET评估"""
+        # 检查权重有效性
+        if np.isnan(weights).any() or np.isinf(weights).any():
+            weights = np.ones_like(weights) / len(weights)  # 重置为等权重 
+
+        # 分割训练集和测试集
+        split_idx = len(data) // 2
+        train_data = data[:split_idx]
+        test_data = data[split_idx:]
+
+        # 使用缓存模型
+        try:
+            model = self._get_mset_model(tuple(map(tuple, train_data)))
+            flags = model.calcSPRT(test_data, weights)
+            
+            # 过滤NaN值并计算均值
+            valid_flags = [x for x in flags if not np.isnan(x)]
+            if not valid_flags:
+                return 50.0  # 默认中性值
+                
+            return float(np.mean(valid_flags))
+        except Exception as e:
+            print(f"MSET评估失败: {str(e)}")
+            return 50.0  # 默认中性值
+    
+    def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
+        """生成等权重的子系统权重向量"""
+        n = len(subsystems)
+        if n == 0:
+            return np.array([])
+
+        # 直接返回等权重向量
+        return np.ones(n) / n

BIN
health_models/WOF046400029/WOF046400029.pkl


BIN
health_models/WOF091200030/WOF091200030.pkl


BIN
health_models/WOF093400005/WOF093400005.pkl


+ 209 - 0
health_pretrain.py

@@ -0,0 +1,209 @@
+import os
+import joblib
+import numpy as np
+import pandas as pd
+from sklearn.neighbors import BallTree
+from typing import Dict, Optional, List
+from health_evalution_class import HealthAssessor
+import logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class WindFarmPretrainModel:
+    """整个风场的预训练模型"""
+    
+    def __init__(self, wind_code: str):
+        self.wind_code = wind_code
+        self.mill_type = None  # 风场主要机型
+        self.subsystem_models = {}  # 各子系统模型
+        self.features = {}  # 各子系统使用的特征
+        self.turbine_codes = []  # 包含的风机列表
+        
+    def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
+        """训练风场模型(支持单特征子系统)"""
+        self.mill_type = mill_type
+        self.turbine_codes = list(data_dict.keys())
+        assessor = HealthAssessor()
+        
+        # 合并所有风机数据用于训练
+        all_data = pd.concat(data_dict.values())
+        
+        # 训练各子系统模型
+        subsystems = {
+            'generator': assessor.subsystem_config['generator'][mill_type],
+            'nacelle': assessor.subsystem_config['nacelle'],
+            'grid': assessor.subsystem_config['grid'],
+            'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None
+        }
+        
+        for subsys, config in subsystems.items():
+            if config is None:
+                continue
+                
+            # 获取子系统特征
+            features = assessor._get_subsystem_features(config, all_data)
+            logger.info('features',features)
+            if not features:
+                logger.warning(f"子系统 {subsys} 无有效特征")
+                continue
+                
+            # 准备训练数据 - 降低样本量要求但至少需要100个样本
+            train_data = all_data[features].dropna()
+            if len(train_data) < 100:  # 原为1000
+                logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本")
+                continue
+                
+            try:
+                # 训练MSET模型
+                mset = assessor._create_mset_core()
+                if mset.genDLMatrix(train_data.values) != 0:
+                    continue
+                    
+                # 计算权重 - 支持单特征
+                normalized_data = mset.CRITIC_prepare(train_data)
+                
+                # 单特征直接赋权重1.0
+                if len(normalized_data.columns) == 1:
+                    weights = pd.Series([1.0], index=normalized_data.columns)
+                else:
+                    weights = mset.CRITIC(normalized_data)
+                
+                # 保存子系统模型
+                self.subsystem_models[subsys] = {
+                    'matrixD': mset.matrixD,
+                    'healthyResidual': mset.healthyResidual,
+                    'feature_weights': weights.to_dict()
+                }
+                self.features[subsys] = features
+                
+            except Exception as e:
+                logger.error(f"子系统 {subsys} 训练失败: {str(e)}")
+                continue
+            
+    def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict:
+        """使用预训练模型进行评估(支持单特征子系统)"""
+        if not self.subsystem_models:
+            return {}
+            
+        results = {
+            "engine_code": turbine_code,
+            "subsystems": {},
+            "assessed_subsystems": []
+        }
+        
+        for subsys in self.subsystem_models.keys():
+            if subsys not in self.features:
+                continue
+                
+            features = [f for f in self.features[subsys] if f in data.columns]
+            if not features:
+                continue
+                
+            test_data = data[features].dropna()
+            if len(test_data) < 5:  # 降低最小样本量要求(原为10)
+                continue
+                
+            try:
+                # 确保权重有效
+                weights_dict = self.subsystem_models[subsys]['feature_weights']
+                weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features))
+                
+                # 初始化MSET模型(如果尚未初始化)
+                if not hasattr(self, '_balltree_cache'):
+                    self._init_balltree_cache()
+                    
+                mset = self._balltree_cache.get(subsys)
+                if not mset:
+                    continue
+                    
+                flags = mset.calcSPRT(test_data.values, weights.values)
+                valid_flags = [x for x in flags if not np.isnan(x)]
+                health_score = float(np.mean(valid_flags)) if valid_flags else 50.0
+                
+                results["subsystems"][subsys] = {
+                    "health_score": health_score,
+                    "weights": weights_dict
+                }
+                # logger.info(f"打印结果: {results['subsystems'][subsys]}")                
+
+
+                bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
+                adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
+
+                def adjust_score(score):
+                    for i in range(len(bins)):
+                        if score < bins[i]:
+                            return score + adjust_values[i-1]
+                    return score  #
+
+                adjusted_score = adjust_score(health_score)  #
+                if adjusted_score >= 100:
+                    adjusted_score = 92.8
+                results["subsystems"][subsys] = {
+                    "health_score": adjusted_score,
+                    "weights": weights_dict
+                }
+                # logger.info(f"打印结果: {results['subsystems'][subsys]}")        
+                results["assessed_subsystems"].append(subsys)
+            except Exception as e:
+                logger.info(f"子系统 {subsys} 评估失败: {str(e)}")
+                continue
+        
+        # 计算整机健康度
+        if results["assessed_subsystems"]:
+            scores = [results["subsystems"][s]["health_score"] 
+                    for s in results["assessed_subsystems"]]
+            weights = np.ones(len(scores)) / len(scores)  # 子系统间使用等权重
+            results["total_health_score"] = float(np.dot(scores, weights))
+            
+        return results
+
+    def _init_balltree_cache(self):
+        """初始化BallTree缓存"""
+        self._balltree_cache = {}
+        assessor = HealthAssessor()
+        for subsys, model in self.subsystem_models.items():
+            try:
+                mset = assessor._create_mset_core()
+                mset.matrixD = model['matrixD']
+                mset.healthyResidual = model['healthyResidual']
+                mset.normalDataBallTree = BallTree(
+                    mset.matrixD,
+                    leaf_size=4,
+                    metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b)
+                )
+                self._balltree_cache[subsys] = mset
+            except Exception as e:
+                logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}")
+
+
+
+    def save(self, model_dir: str):
+        """保存模型到文件"""
+        save_data = {
+            "wind_code": self.wind_code,
+            "mill_type": self.mill_type,
+            "subsystem_models": self.subsystem_models,
+            "features": self.features,
+            "turbine_codes": self.turbine_codes
+        }
+        
+        os.makedirs(model_dir, exist_ok=True)
+        path = os.path.join(model_dir, f"{self.wind_code}.pkl")
+        joblib.dump(save_data, path)
+        
+    @classmethod
+    def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']:
+        """从文件加载模型"""
+        path = os.path.join(model_dir, f"{wind_code}.pkl")
+        if not os.path.exists(path):
+            return None
+            
+        data = joblib.load(path)
+        model = cls(data["wind_code"])
+        model.mill_type = data["mill_type"]
+        model.subsystem_models = data["subsystem_models"]
+        model.features = data["features"]
+        model.turbine_codes = data.get("turbine_codes", [])
+        return model

+ 154 - 0
train_health.py

@@ -0,0 +1,154 @@
+import os
+import sys
+import pandas as pd
+from datetime import datetime
+from typing import Optional
+from database import DataFetcher
+from health_pretrain import WindFarmPretrainModel
+import logging
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout),
+        logging.FileHandler('train_health.log')
+    ]
+)
+logger = logging.getLogger(__name__)
+
+# 配置
+WIND_CODE = "WOF093400005"  # 张崾先:WOF091200030 七台河:WOF046400029 诺木洪:WOF093400005
+START_DATE = "2023-12-01 00:00:00 "# 张崾先:2023-10-20 00:00:00~2024-10-20 00:00:00 七台河:2023-10-02 00:00:00~2024-10-02 00:00:00
+END_DATE = "2024-05-30 23:59:59" #诺木洪 2023-12-01 00:00:00 ~ 2024-05-30 23:50:00
+MODEL_DIR = "health_models"
+MIN_SAMPLES = 100  # 最小训练样本数
+
+def fetch_turbine_data(fetcher: DataFetcher, wind_code: str, turbine_code: str) -> Optional[pd.DataFrame]:
+    """获取单个风机的完整训练数据"""
+    try:
+        # 获取所有可用列
+        columns = fetcher.get_turbine_columns(wind_code)
+        if not columns:
+            logger.warning(f"{turbine_code} 无可用数据列")
+            return None
+
+        special_wind_farms = {
+            "WOF093400005": f"`{wind_code}-WOB000001_minute`"  # 加上反引号
+        }
+
+        # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+        table = special_wind_farms.get(wind_code, f"{wind_code}_minute")
+        # 构建查询 - 使用参数化查询防止SQL注入
+        query = f"""
+        SELECT * 
+        FROM {table}
+        WHERE `wind_turbine_number` = %s
+          AND `time_stamp` BETWEEN %s AND %s
+        """
+        
+        # 执行查询
+        logger.info(f"正在获取风机 {turbine_code} 数据...")
+        df = pd.read_sql(
+            query, 
+            fetcher.data_engine,
+            params=(turbine_code, START_DATE, END_DATE)
+        )
+        
+        if df.empty:
+            logger.warning(f"{turbine_code} 无数据")
+            return None
+        print("数据项",df)    
+        logger.info(f"获取到 {turbine_code} 数据 {len(df)} 条")
+        return df
+        
+    except Exception as e:
+        logger.error(f"获取 {turbine_code} 数据失败: {str(e)}")
+        return None
+
+def train_windfarm_model(
+    fetcher: DataFetcher, 
+    wind_code: str,
+    turbines: pd.DataFrame
+) -> bool:
+    """训练风场模型"""
+    try:
+        # 获取所有风机数据
+        data_dict = {}
+        valid_turbines = []
+        
+        for idx, turbine_info in turbines.iterrows():
+            turbine_code = turbine_info['engine_code']
+            data = fetch_turbine_data(fetcher, wind_code, turbine_code)
+            if data is not None and len(data) >= MIN_SAMPLES:
+                data_dict[turbine_code] = data
+                valid_turbines.append(turbine_info)
+        
+        if not data_dict:
+            logger.error("无有效风机数据,无法训练风场模型")
+            return False
+
+        # 确定主要机型(取出现次数最多的机型)
+        mill_type_counts = {}
+        for turbine_info in valid_turbines:
+            mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
+            mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+            if mill_type != 'unknown':
+                mill_type_counts[mill_type] = mill_type_counts.get(mill_type, 0) + 1
+        
+        if not mill_type_counts:
+            logger.error("无法确定风场主要机型")
+            return False
+            
+        main_mill_type = max(mill_type_counts.items(), key=lambda x: x[1])[0]
+        
+        # 训练模型
+        logger.info(f"开始训练风场 {wind_code} ({main_mill_type})模型...")
+        model = WindFarmPretrainModel(wind_code)
+        model.train(data_dict, main_mill_type)
+        
+        # 保存模型
+        model_dir = os.path.join(MODEL_DIR, wind_code)
+        model.save(model_dir)
+        logger.info(f"风场 {wind_code} 模型训练完成并保存")
+        return True
+        
+    except Exception as e:
+        logger.error(f"训练风场模型失败: {str(e)}", exc_info=True)
+        return False
+    
+def main():
+    """主训练流程"""
+    logger.info("=== 开始健康评估模型训练 ===")
+    logger.info(f"风场: {WIND_CODE}")
+    logger.info(f"时间范围: {START_DATE} 至 {END_DATE}")
+    
+    # 初始化数据获取器
+    fetcher = DataFetcher()
+    
+    # 获取风场下所有风机
+    logger.info("获取风机列表...")
+    turbines = fetcher.get_turbines(WIND_CODE)
+    if turbines.empty:
+        logger.error("无风机数据,终止训练")
+        return
+    
+    logger.info(f"共发现 {len(turbines)} 台风机")
+    
+    # 创建模型目录
+    model_dir = os.path.join(MODEL_DIR, WIND_CODE)
+    os.makedirs(model_dir, exist_ok=True)
+    
+    # 训练风场模型
+    if train_windfarm_model(fetcher, WIND_CODE, turbines):
+        logger.info(f"风场 {WIND_CODE} 模型训练成功")
+    else:
+        logger.error(f"风场 {WIND_CODE} 模型训练失败")
+
+if __name__ == "__main__":
+    try:
+        main()
+    except Exception as e:
+        logger.error(f"训练流程异常终止: {str(e)}", exc_info=True)
+        sys.exit(1)