Просмотр исходного кода

健康评估添加预训练和缓存功能

wangjiaojiao 3 недель назад
Родитель
Сommit
56d480b840
1 измененных файлов с 324 добавлено и 134 удалено
  1. 324 134
      app/routers/health.py

+ 324 - 134
app/routers/health.py

@@ -1,156 +1,346 @@
 import math
 from datetime import datetime
-from typing import List
-
+from typing import List,Dict
+import pandas as pd
 from fastapi import HTTPException, APIRouter
-
+import time
+import os
 from app.logger import logger
 from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult
-
 from app.services.HealthAssessor import HealthAssessor
 from app.services.HealthDataFetcher import DataFetcher
-
+import concurrent.futures
+from app.services.HealthPretrain import WindFarmPretrainModel
+from app.services.HealthCacheService import CacheService
 router = APIRouter()
 
 
+# 全局存储
+PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {}  # 改为存储风场级模型
+REALTIME_ASSESSOR = HealthAssessor()
+# 新增缓存服务
+CACHE_SERVICE = CacheService(
+host='localhost', 
+port=6379, 
+ttl=2592000 #单位秒 
+)  
+
+@router.on_event("startup")
+async def load_resources():
+    """加载预训练模型和其他资源"""
+    # 测试Redis连接
+    if not CACHE_SERVICE.ping():
+        logger.error("无法连接Redis服务器,缓存功能将不可用")
+    
+    # 加载预训练模型
+    model_root ="./app/health-models"
+   # model_root = "health_models"
+    if os.path.exists(model_root):
+        for wind_code in os.listdir(model_root):
+            wind_dir = os.path.join(model_root, wind_code)
+            if os.path.isdir(wind_dir):
+                try:
+                    model = WindFarmPretrainModel.load(wind_dir, wind_code)
+                    if model:
+                        PRETRAIN_MODELS[wind_code] = model
+                except Exception as e:
+                    logger.error(f"加载风场模型失败 {wind_code}: {str(e)}")
+    
+    logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型")
+
+
 @router.post("/health_assess", response_model=List[AssessmentResult])
-async def assess_windfarm(request: AssessmentRequest):
+async def assess_windfarm_optimized(request: AssessmentRequest):
+    """优化后的评估流程(添加缓存支持)"""
+    total_start = time.time()
+    logger.info(f"开始评估风场 {request.windcode} {request.month}")
+    
     try:
-        datetime.strptime(request.month, "%Y-%m")
-    except ValueError:
-        raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式")
-    logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}")
-
-    fetcher = DataFetcher()
-    assessor = HealthAssessor()
-    turbines = fetcher.get_turbines(request.windcode)
-    print(turbines)
-    if turbines.empty:
-        raise HTTPException(status_code=404, detail="未找到风场数据")
-
-    results: List[AssessmentResult] = []  # 显式声明列表类型并初始化
-    all_turbines = len(turbines)
-    processed_count = 0
-
-    for idx, row in turbines.iterrows():
-        try:
-            engine_code = row['engine_code']
-            mill_type_code = row['mill_type_code']
-            wind_turbine_name = row['engine_name']
-            # 获取机型类型
-            mill_type_num = fetcher.get_mill_type(mill_type_code)
-            mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
-
-            if mill_type == 'unknown':
-                logger.warning(f"{engine_code}机型未知,跳过处理")
-                continue
-
-            # 获取特征与数据
-            # 先获取该风场所有可用列名
-            available_columns = fetcher.get_turbine_columns(request.windcode)
-            if not available_columns:
-                continue
-                # 获取有效特征列(基于实际列名)
-            valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns)
-            # 获取数据
-            data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features)
-            print(data)
-
-            if data is None or data.empty:  # 增加对None的检查
-                logger.warning(f"{engine_code}在{request.month}无有效数据")
-                # 创建空评估结果,所有评分为-1
-                empty_assessment = {
-                    'engine_code': engine_code,
-                    'wind_turbine_name': wind_turbine_name,
-                    'mill_type': mill_type,
-                    'total_health_score': -1,
-                    'subsystems': {
-                        'generator': {
-                            'health_score': -1,
-                            'weights': {},
-                            'message': None
-                        },
-                        'nacelle': {
-                            'health_score': -1,
-                            'weights': {},
-                            'message': None
-                        },
-                        'grid': {
-                            'health_score': -1,
-                            'weights': {},
-                            'message': None
-                        },
-                        'drive_train': {
-                            'health_score': -1,
-                            'weights': {},
-                            'message': None
-                        }
-                    },
-                    'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
-                }
-                formatted_result = _format_result(empty_assessment)
-                results.append(formatted_result)
-                processed_count += 1
-                logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)")
-                continue
-
-            # 执行评估并格式化结果
-            assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name)
-            if not assessment:  # 检查评估结果有效性
-                logger.warning(f"{engine_code}评估结果为空")
-                continue
-
-            formatted_result = _format_result(assessment)
-            if isinstance(formatted_result, AssessmentResult):  # 严格类型检查
-                results.append(formatted_result)
-                processed_count += 1
-                logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)")
-            else:
-                logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}")
-
-        except Exception as e:
-            logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True)
-            continue
-
-    # 最终返回处理 - 强制转换为列表并确保类型正确
-    if not results:
-        logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表")
-        return []
-
-    cleaned_results = [clean_nans(result.dict()) for result in results]
-
-    # 防御性类型检查(生产环境可移除)
-    if not isinstance(cleaned_results, list):
-        logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表")
-        cleaned_results = [cleaned_results] if cleaned_results is not None else []
-
-    return cleaned_results
-
-
-def _format_result(assessment):
-    """格式化评估结果"""
-    return AssessmentResult(
-        engine_code=assessment['engine_code'],
-        wind_turbine_name=assessment['wind_turbine_name'],
-        mill_type=assessment['mill_type'],
-        total_health_score=assessment.get('total_health_score'),
-        subsystems={
-            k: SubsystemResult(**v)
-            for k, v in assessment['subsystems'].items()
-        },
-        assessed_subsystems=assessment.get('assessed_subsystems', [])
+        # 1. 检查缓存
+        request_body = request.dict()
+        cached_data = CACHE_SERVICE.get_cached_response(request_body)
+        
+        if cached_data:
+            logger.info(f"从缓存返回风场 {request.windcode} {request.month} 的结果")
+            # 转换缓存数据为响应模型
+            return [AssessmentResult(**item) for item in cached_data]
+            
+        # 2. 无缓存则执行原有评估逻辑
+        fetcher_start = time.time()
+        fetcher = DataFetcher()
+        logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s")
+        
+        # 获取风场所有风机信息
+        turbine_start = time.time()
+        turbines = fetcher.get_turbines(request.windcode)
+        logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s")
+        
+        if turbines.empty:
+            return []
+        
+        # 检查预训练模型
+        model_check_start = time.time()
+        model = PRETRAIN_MODELS.get(request.windcode)
+        logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s")
+        
+        if not model:
+            logger.info("未使用预训练模型,采用实时评估")
+            results = [assess_with_realtime(fetcher, request.windcode, row['engine_code'], 
+                                          request.month, 
+                                          get_mill_type(fetcher, row['mill_type_code']),
+                                          row['engine_name']) 
+                     for _, row in turbines.iterrows()]
+        else:
+            # 批量获取所有风机数据
+            data_fetch_start = time.time()
+            all_features = list({f for feats in model.features.values() for f in feats})
+            all_data = fetcher.fetch_all_turbines_data(
+                request.windcode, 
+                request.month,
+                all_features
+            )
+            logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s")
+            logger.info(f"获取到 {len(all_data)} 台风机数据")
+            
+            # 并行评估
+            assess_start = time.time()
+            results = []
+            with concurrent.futures.ThreadPoolExecutor() as executor:
+                futures = []
+                for _, row in turbines.iterrows():
+                    engine_code = row['engine_code']
+                    if engine_code not in all_data:
+                        empty_assessment = create_empty_assessment(
+                            engine_code,
+                            row['engine_name'],
+                            get_mill_type(fetcher, row['mill_type_code'])
+                        )
+                        futures.append(executor.submit(
+                            lambda: (_format_result(empty_assessment)))
+                        )
+                        continue
+                        
+                    futures.append(executor.submit(
+                        _assess_single_turbine,
+                        model, all_data[engine_code], row, fetcher
+                    ))
+                
+                for future in concurrent.futures.as_completed(futures):
+                    try:
+                        result = future.result()
+                        results.append(result)
+                    except Exception as e:
+                        logger.error(f"评估失败: {str(e)}")
+                        continue
+            
+            logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s")
+
+        # 3. 将结果存入缓存(异步执行,不阻塞请求)
+        if results:
+            try:
+          
+                # 转换为可序列化的字典格式
+                cache_data = [r.dict() for r in results if r is not None]
+                CACHE_SERVICE.set_cached_response(
+                    request_body,
+                    cache_data,
+                    3600  # 缓存1小时
+                )
+            except Exception as e:
+                logger.error(f"缓存设置失败: {str(e)}")    
+        
+        logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s")
+        return results
+        
+    except Exception as e:
+        logger.error(f"评估流程异常: {str(e)}", exc_info=True)
+        raise HTTPException(status_code=500, detail="内部服务器错误")
+    
+
+def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name):
+    """实时评估逻辑(添加耗时统计)"""
+    start_time = time.time()
+    
+    # 获取可用特征列
+    col_start = time.time()
+    available_columns = fetcher.get_turbine_columns(windcode)
+    logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s")
+    
+    if not available_columns:
+        return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
+        
+    # 获取有效特征
+    feat_start = time.time()
+    valid_features = REALTIME_ASSESSOR._get_all_possible_features(
+        REALTIME_ASSESSOR, mill_type, available_columns
+    )
+    logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s")
+    
+    # 获取数据
+    data_start = time.time()
+    data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features)
+    logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
+    
+    if data is None or data.empty:
+        return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
+    
+    # 执行评估
+    assess_start = time.time()
+    assessment = REALTIME_ASSESSOR.assess_turbine(
+        engine_code, data, mill_type, wind_turbine_name
     )
+    logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s")
+    
+    logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s")
+    return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type))    
 
 
+
+def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type):
+    """使用预训练模型评估"""
+    start_time = time.time()    
+    model = PRETRAIN_MODELS.get(windcode)
+    if not model:
+        logger.warning(f"未找到风场 {windcode} 的预训练模型")
+        return None
+    
+    # 检查机型是否匹配
+    if model.mill_type != mill_type:
+        logger.warning(
+            f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. "
+            f"风机 {engine_code} 将使用实时计算"
+        )
+        return None
+    feat_start = time.time()   
+    # 获取模型特征(确保不重复且存在)
+    features = list({
+        f for subsys_feats in model.features.values() 
+        for f in subsys_feats 
+        if f in fetcher.get_turbine_columns(windcode)  # 确保特征在数据库中存在
+    })
+    logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s")
+    
+    if not features:
+        logger.warning(f"风场模型 {windcode} 无可用特征")
+        return None
+    
+    # 获取数据
+    data_start = time.time()
+    data = fetcher.fetch_turbine_data(windcode, engine_code, month, features)
+    logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
+    if data is None or data.empty:
+        logger.warning(f"风机 {engine_code} 无有效数据")
+        return None
+    
+    # 执行评估
+    assess_start = time.time()
+    try:
+        assessment = model.assess(data, engine_code)
+        logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s")
+        
+        if not assessment:
+            logger.warning(f"风场模型评估失败 {engine_code}")
+            return None
+        
+        assessment.update({
+            'engine_code': engine_code,
+            'wind_turbine_name': wind_turbine_name,
+            'mill_type': mill_type
+        })
+        logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s")
+        return clean_nans(assessment)
+    except Exception as e:
+        logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True)
+        return None
+
+
+def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame, 
+                          turbine_info: dict, fetcher: DataFetcher):
+    """单个风机评估"""
+    start_time = time.time()
+    try:
+        # 获取机型
+        mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
+        mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+        
+        # 评估
+        assess_start = time.time()
+        assessment = model.assess(data, turbine_info['engine_code'])
+        logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s")
+        
+        assessment.update({
+            'engine_code': turbine_info['engine_code'],
+            'wind_turbine_name': turbine_info['engine_name'],
+            'mill_type': mill_type
+        })
+        logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s")
+        return _format_result(clean_nans(assessment), "pretrained")
+    except Exception as e:
+        logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}")
+        empty_assessment = create_empty_assessment(
+            turbine_info['engine_code'],
+            turbine_info['engine_name'],
+            mill_type
+        )
+        return _format_result(empty_assessment)
+
+def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str:
+    """获取机型字符串"""
+    mill_type_num = fetcher.get_mill_type(mill_type_code)
+    return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+
 def clean_nans(obj):
-    """递归清理字典和列表中的NaN值"""
+    """递归清理NaN值"""
     if isinstance(obj, dict):
         return {k: clean_nans(v) for k, v in obj.items()}
     elif isinstance(obj, list):
         return [clean_nans(item) for item in obj]
-    elif isinstance(obj, float) and math.isnan(obj):
-        return -1.0  # 替换为-1表示无效值
-    elif isinstance(obj, float) and not math.isfinite(obj):
-        return -1.0  # 处理无穷大值
+    elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)):
+        return -1.0
     else:
         return obj
+
+def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict:
+    """创建空评估结果"""
+    return {
+        'engine_code': engine_code,
+        'wind_turbine_name': wind_turbine_name,
+        'mill_type': mill_type,
+        'total_health_score': -1,
+        'subsystems': {
+            'generator': {'health_score': -1, 'weights': {}, 'message': None},
+            'nacelle': {'health_score': -1, 'weights': {}, 'message': None},
+            'grid': {'health_score': -1, 'weights': {}, 'message': None},
+            'drive_train': {'health_score': -1, 'weights': {}, 'message': None}
+        },
+        'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
+    }
+
+
+def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult:
+    """格式化评估结果"""
+    # 确保所有子系统都存在,即使评分为-1
+    subsystems = assessment.get('subsystems', {})
+    for subsys in ['generator', 'nacelle', 'grid', 'drive_train']:
+        if subsys not in subsystems:
+            subsystems[subsys] = {
+                'health_score': -1,
+                'weights': {},
+                'message': None
+            }
+    return AssessmentResult(
+        engine_code=assessment['engine_code'],
+        wind_turbine_name=assessment['wind_turbine_name'],
+        mill_type=assessment['mill_type'],
+        total_health_score=assessment.get('total_health_score', -1),
+        subsystems={
+            k: SubsystemResult(
+                health_score=v['health_score'],
+                weights=v.get('weights', {}),
+                message=v.get('message')
+            ) for k, v in subsystems.items()
+        },
+        assessed_subsystems=assessment.get('assessed_subsystems', []),
+        model_type=model_type
+    )
+