from fastapi import FastAPI, HTTPException from pydantic import BaseModel from datetime import datetime from typing import List, Dict, Optional import logging import os import glob import joblib import math import numpy as np import pandas as pd app = FastAPI(root_path="/api/health") import time from health_pretrain import WindFarmPretrainModel from database import DataFetcher from health_evalution_class import HealthAssessor import pandas as pd import concurrent.futures # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 请求模型定义 class AssessmentRequest(BaseModel): windcode: str month: str # 格式: "YYYY-MM" class SubsystemResult(BaseModel): health_score: float weights: Dict[str, float] message: Optional[str] = None class AssessmentResult(BaseModel): engine_code: str wind_turbine_name: str mill_type: str total_health_score: Optional[float] subsystems: Dict[str, SubsystemResult] assessed_subsystems: List[str] model_type: Optional[str] = None # 新增字段,不影响原有结构 # 全局存储 PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {} # 改为存储风场级模型 REALTIME_ASSESSOR = HealthAssessor() @app.on_event("startup") def load_resources(): """加载预训练模型和其他资源""" # 加载预训练模型 model_root = "health_models" if os.path.exists(model_root): for wind_code in os.listdir(model_root): wind_dir = os.path.join(model_root, wind_code) if os.path.isdir(wind_dir): try: model = WindFarmPretrainModel.load(wind_dir, wind_code) if model: PRETRAIN_MODELS[wind_code] = model except Exception as e: logger.error(f"加载风场模型失败 {wind_code}: {str(e)}") logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型") def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult: """格式化评估结果(完全兼容原有逻辑)""" # 确保所有子系统都存在,即使评分为-1 subsystems = assessment.get('subsystems', {}) for subsys in ['generator', 'nacelle', 'grid', 'drive_train']: if subsys not in subsystems: subsystems[subsys] = { 'health_score': -1, 'weights': {}, 'message': None } return AssessmentResult( engine_code=assessment['engine_code'], wind_turbine_name=assessment['wind_turbine_name'], mill_type=assessment['mill_type'], total_health_score=assessment.get('total_health_score', -1), subsystems={ k: SubsystemResult( health_score=v['health_score'], weights=v.get('weights', {}), message=v.get('message') ) for k, v in subsystems.items() }, assessed_subsystems=assessment.get('assessed_subsystems', []), model_type=model_type ) def clean_nans(obj): """递归清理NaN值(保留原有逻辑)""" if isinstance(obj, dict): return {k: clean_nans(v) for k, v in obj.items()} elif isinstance(obj, list): return [clean_nans(item) for item in obj] elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)): return -1.0 else: return obj def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict: """创建空评估结果(完全保留原有逻辑)""" return { 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type, 'total_health_score': -1, 'subsystems': { 'generator': {'health_score': -1, 'weights': {}, 'message': None}, 'nacelle': {'health_score': -1, 'weights': {}, 'message': None}, 'grid': {'health_score': -1, 'weights': {}, 'message': None}, 'drive_train': {'health_score': -1, 'weights': {}, 'message': None} }, 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train'] } def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name): """实时评估逻辑(添加耗时统计)""" start_time = time.time() # 获取可用特征列 col_start = time.time() available_columns = fetcher.get_turbine_columns(windcode) logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s") if not available_columns: return create_empty_assessment(engine_code, wind_turbine_name, mill_type) # 获取有效特征 feat_start = time.time() valid_features = REALTIME_ASSESSOR._get_all_possible_features( REALTIME_ASSESSOR, mill_type, available_columns ) logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s") # 获取数据 data_start = time.time() data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features) logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s") if data is None or data.empty: return create_empty_assessment(engine_code, wind_turbine_name, mill_type) # 执行评估 assess_start = time.time() assessment = REALTIME_ASSESSOR.assess_turbine( engine_code, data, mill_type, wind_turbine_name ) logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s") logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s") return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type)) def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type): """使用预训练模型评估""" start_time = time.time() model = PRETRAIN_MODELS.get(windcode) if not model: logger.warning(f"未找到风场 {windcode} 的预训练模型") return None # 检查机型是否匹配 if model.mill_type != mill_type: logger.warning( f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. " f"风机 {engine_code} 将使用实时计算" ) return None feat_start = time.time() # 获取模型特征(确保不重复且存在) features = list({ f for subsys_feats in model.features.values() for f in subsys_feats if f in fetcher.get_turbine_columns(windcode) # 确保特征在数据库中存在 }) logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s") if not features: logger.warning(f"风场模型 {windcode} 无可用特征") return None # 获取数据 data_start = time.time() data = fetcher.fetch_turbine_data(windcode, engine_code, month, features) logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s") if data is None or data.empty: logger.warning(f"风机 {engine_code} 无有效数据") return None # 执行评估 assess_start = time.time() try: assessment = model.assess(data, engine_code) logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s") if not assessment: logger.warning(f"风场模型评估失败 {engine_code}") return None assessment.update({ 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type }) logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s") return clean_nans(assessment) except Exception as e: logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True) return None @app.post("/health_assess", response_model=List[AssessmentResult]) async def assess_windfarm_optimized(request: AssessmentRequest): """优化后的评估流程(添加耗时统计)""" total_start = time.time() logger.info(f"开始评估风场 {request.windcode} {request.month}") try: fetcher_start = time.time() fetcher = DataFetcher() logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s") # 1. 获取风场所有风机信息 turbine_start = time.time() turbines = fetcher.get_turbines(request.windcode) logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s") if turbines.empty: return [] # 2. 检查预训练模型 model_check_start = time.time() model = PRETRAIN_MODELS.get(request.windcode) logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s") if not model: logger.info("未使用预训练模型,采用实时评估") return [assess_with_realtime(fetcher, request.windcode, row['engine_code'], request.month, get_mill_type(fetcher, row['mill_type_code']), row['engine_name']) for _, row in turbines.iterrows()] # 3. 批量获取所有风机数据 data_fetch_start = time.time() all_features = list({f for feats in model.features.values() for f in feats}) all_data = fetcher.fetch_all_turbines_data( request.windcode, request.month, all_features ) logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s") logger.info(f"获取到 {len(all_data)} 台风机数据") # 4. 并行评估 assess_start = time.time() results = [] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for _, row in turbines.iterrows(): engine_code = row['engine_code'] if engine_code not in all_data: empty_assessment = create_empty_assessment( engine_code, row['engine_name'], get_mill_type(fetcher, row['mill_type_code']) ) futures.append(executor.submit( lambda: (_format_result(empty_assessment)) )) continue futures.append(executor.submit( _assess_single_turbine, model, all_data[engine_code], row, fetcher )) for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.error(f"评估失败: {str(e)}") continue logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s") logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s") return results except Exception as e: logger.error(f"评估流程异常: {str(e)}") raise HTTPException(status_code=500, detail="内部服务器错误") def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame, turbine_info: dict, fetcher: DataFetcher): """单个风机评估(添加耗时统计)""" start_time = time.time() try: # 获取机型 mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code']) mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown') # 评估 assess_start = time.time() assessment = model.assess(data, turbine_info['engine_code']) logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s") assessment.update({ 'engine_code': turbine_info['engine_code'], 'wind_turbine_name': turbine_info['engine_name'], 'mill_type': mill_type }) logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s") return _format_result(clean_nans(assessment), "pretrained") except Exception as e: logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}") empty_assessment = create_empty_assessment( turbine_info['engine_code'], turbine_info['engine_name'], mill_type ) return _format_result(empty_assessment) def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str: """获取机型字符串""" mill_type_num = fetcher.get_mill_type(mill_type_code) return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')