import math from datetime import datetime from typing import List,Dict import pandas as pd from fastapi import HTTPException, APIRouter import time import os from app.logger import logger from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult from app.services.HealthAssessor import HealthAssessor from app.services.HealthDataFetcher import DataFetcher import concurrent.futures from app.services.HealthPretrain import WindFarmPretrainModel from app.services.HealthCacheService import CacheService router = APIRouter() # 全局存储 PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {} # 改为存储风场级模型 REALTIME_ASSESSOR = HealthAssessor() # 新增缓存服务 CACHE_SERVICE = CacheService( host='192.168.50.233', port=6379, db =10, password=123456, ttl=None #单位秒 ) @router.on_event("startup") async def load_resources(): """加载预训练模型和其他资源""" # 测试Redis连接 if not CACHE_SERVICE.ping(): logger.error("无法连接Redis服务器,缓存功能将不可用") # 加载预训练模型 model_root ="./app/health-models" # model_root = "health_models" if os.path.exists(model_root): for wind_code in os.listdir(model_root): wind_dir = os.path.join(model_root, wind_code) if os.path.isdir(wind_dir): try: model = WindFarmPretrainModel.load(wind_dir, wind_code) if model: PRETRAIN_MODELS[wind_code] = model except Exception as e: logger.error(f"加载风场模型失败 {wind_code}: {str(e)}") logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型") @router.post("/health_assess", response_model=List[AssessmentResult]) async def assess_windfarm_optimized(request: AssessmentRequest): """优化后的评估流程(添加缓存支持)""" total_start = time.time() logger.info(f"开始评估风场 {request.windcode} {request.month}") try: # 1. 检查缓存 request_body = request.dict() cached_data = CACHE_SERVICE.get_cached_response(request_body) if cached_data: logger.info(f"从缓存返回风场 {request.windcode} {request.month} 的结果") # 转换缓存数据为响应模型 return [AssessmentResult(**item) for item in cached_data] # 2. 无缓存则执行原有评估逻辑 fetcher_start = time.time() fetcher = DataFetcher() logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s") # 获取风场所有风机信息 turbine_start = time.time() turbines = fetcher.get_turbines(request.windcode) logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s") if turbines.empty: return [] # 检查预训练模型 model_check_start = time.time() model = PRETRAIN_MODELS.get(request.windcode) logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s") if not model: logger.info("未使用预训练模型,采用实时评估") results = [assess_with_realtime(fetcher, request.windcode, row['engine_code'], request.month, get_mill_type(fetcher, row['mill_type_code']), row['engine_name']) for _, row in turbines.iterrows()] else: # 批量获取所有风机数据 data_fetch_start = time.time() all_features = list({f for feats in model.features.values() for f in feats}) all_data = fetcher.fetch_all_turbines_data( request.windcode, request.month, all_features ) logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s") logger.info(f"获取到 {len(all_data)} 台风机数据") # 并行评估 assess_start = time.time() results = [] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for _, row in turbines.iterrows(): engine_code = row['engine_code'] if engine_code not in all_data: empty_assessment = create_empty_assessment( engine_code, row['engine_name'], get_mill_type(fetcher, row['mill_type_code']) ) futures.append(executor.submit( lambda: (_format_result(empty_assessment))) ) continue futures.append(executor.submit( _assess_single_turbine, model, all_data[engine_code], row, fetcher )) for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.error(f"评估失败: {str(e)}") continue logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s") # 3. 将结果存入缓存(异步执行,不阻塞请求) if results: try: # 转换为可序列化的字典格式 cache_data = [r.dict() for r in results if r is not None] CACHE_SERVICE.set_cached_response( request_body, cache_data, None # 缓存1小时 ) except Exception as e: logger.error(f"缓存设置失败: {str(e)}") logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s") return results except Exception as e: logger.error(f"评估流程异常: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail="内部服务器错误") def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name): """实时评估逻辑(添加耗时统计)""" start_time = time.time() # 获取可用特征列 col_start = time.time() available_columns = fetcher.get_turbine_columns(windcode) logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s") if not available_columns: return create_empty_assessment(engine_code, wind_turbine_name, mill_type) # 获取有效特征 feat_start = time.time() valid_features = REALTIME_ASSESSOR._get_all_possible_features( REALTIME_ASSESSOR, mill_type, available_columns ) logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s") # 获取数据 data_start = time.time() data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features) logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s") if data is None or data.empty: return create_empty_assessment(engine_code, wind_turbine_name, mill_type) # 执行评估 assess_start = time.time() assessment = REALTIME_ASSESSOR.assess_turbine( engine_code, data, mill_type, wind_turbine_name ) logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s") logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s") return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type)) def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type): """使用预训练模型评估""" start_time = time.time() model = PRETRAIN_MODELS.get(windcode) if not model: logger.warning(f"未找到风场 {windcode} 的预训练模型") return None # 检查机型是否匹配 if model.mill_type != mill_type: logger.warning( f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. " f"风机 {engine_code} 将使用实时计算" ) return None feat_start = time.time() # 获取模型特征(确保不重复且存在) features = list({ f for subsys_feats in model.features.values() for f in subsys_feats if f in fetcher.get_turbine_columns(windcode) # 确保特征在数据库中存在 }) logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s") if not features: logger.warning(f"风场模型 {windcode} 无可用特征") return None # 获取数据 data_start = time.time() data = fetcher.fetch_turbine_data(windcode, engine_code, month, features) logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s") if data is None or data.empty: logger.warning(f"风机 {engine_code} 无有效数据") return None # 执行评估 assess_start = time.time() try: assessment = model.assess(data, engine_code) logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s") if not assessment: logger.warning(f"风场模型评估失败 {engine_code}") return None assessment.update({ 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type }) logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s") return clean_nans(assessment) except Exception as e: logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True) return None def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame, turbine_info: dict, fetcher: DataFetcher): """单个风机评估""" start_time = time.time() try: # 获取机型 mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code']) mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown') # 评估 assess_start = time.time() assessment = model.assess(data, turbine_info['engine_code']) logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s") assessment.update({ 'engine_code': turbine_info['engine_code'], 'wind_turbine_name': turbine_info['engine_name'], 'mill_type': mill_type }) logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s") return _format_result(clean_nans(assessment), "pretrained") except Exception as e: logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}") empty_assessment = create_empty_assessment( turbine_info['engine_code'], turbine_info['engine_name'], mill_type ) return _format_result(empty_assessment) def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str: """获取机型字符串""" mill_type_num = fetcher.get_mill_type(mill_type_code) return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown') def clean_nans(obj): """递归清理NaN值""" if isinstance(obj, dict): return {k: clean_nans(v) for k, v in obj.items()} elif isinstance(obj, list): return [clean_nans(item) for item in obj] elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)): return -1.0 else: return obj def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict: """创建空评估结果""" return { 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type, 'total_health_score': -1, 'subsystems': { 'generator': {'health_score': -1, 'weights': {}, 'message': None}, 'nacelle': {'health_score': -1, 'weights': {}, 'message': None}, 'grid': {'health_score': -1, 'weights': {}, 'message': None}, 'drive_train': {'health_score': -1, 'weights': {}, 'message': None} }, 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train'] } def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult: """格式化评估结果""" # 确保所有子系统都存在,即使评分为-1 subsystems = assessment.get('subsystems', {}) for subsys in ['generator', 'nacelle', 'grid', 'drive_train']: if subsys not in subsystems: subsystems[subsys] = { 'health_score': -1, 'weights': {}, 'message': None } return AssessmentResult( engine_code=assessment['engine_code'], wind_turbine_name=assessment['wind_turbine_name'], mill_type=assessment['mill_type'], total_health_score=assessment.get('total_health_score', -1), subsystems={ k: SubsystemResult( health_score=v['health_score'], weights=v.get('weights', {}), message=v.get('message') ) for k, v in subsystems.items() }, assessed_subsystems=assessment.get('assessed_subsystems', []), model_type=model_type )