123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- import math
- from datetime import datetime
- from typing import List,Dict
- import pandas as pd
- from fastapi import HTTPException, APIRouter
- import time
- import os
- from app.logger import logger
- from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult
- from app.services.HealthAssessor import HealthAssessor
- from app.services.HealthDataFetcher import DataFetcher
- import concurrent.futures
- from app.services.HealthPretrain import WindFarmPretrainModel
- from app.services.HealthCacheService import CacheService
- router = APIRouter()
- # 全局存储
- PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {} # 改为存储风场级模型
- REALTIME_ASSESSOR = HealthAssessor()
- # 新增缓存服务
- CACHE_SERVICE = CacheService(
- host='localhost',
- port=6379,
- ttl=2592000 #单位秒
- )
- @router.on_event("startup")
- async def load_resources():
- """加载预训练模型和其他资源"""
- # 测试Redis连接
- if not CACHE_SERVICE.ping():
- logger.error("无法连接Redis服务器,缓存功能将不可用")
-
- # 加载预训练模型
- model_root ="./app/health-models"
- # model_root = "health_models"
- if os.path.exists(model_root):
- for wind_code in os.listdir(model_root):
- wind_dir = os.path.join(model_root, wind_code)
- if os.path.isdir(wind_dir):
- try:
- model = WindFarmPretrainModel.load(wind_dir, wind_code)
- if model:
- PRETRAIN_MODELS[wind_code] = model
- except Exception as e:
- logger.error(f"加载风场模型失败 {wind_code}: {str(e)}")
-
- logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型")
- @router.post("/health_assess", response_model=List[AssessmentResult])
- async def assess_windfarm_optimized(request: AssessmentRequest):
- """优化后的评估流程(添加缓存支持)"""
- total_start = time.time()
- logger.info(f"开始评估风场 {request.windcode} {request.month}")
-
- try:
- # 1. 检查缓存
- request_body = request.dict()
- cached_data = CACHE_SERVICE.get_cached_response(request_body)
-
- if cached_data:
- logger.info(f"从缓存返回风场 {request.windcode} {request.month} 的结果")
- # 转换缓存数据为响应模型
- return [AssessmentResult(**item) for item in cached_data]
-
- # 2. 无缓存则执行原有评估逻辑
- fetcher_start = time.time()
- fetcher = DataFetcher()
- logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s")
-
- # 获取风场所有风机信息
- turbine_start = time.time()
- turbines = fetcher.get_turbines(request.windcode)
- logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s")
-
- if turbines.empty:
- return []
-
- # 检查预训练模型
- model_check_start = time.time()
- model = PRETRAIN_MODELS.get(request.windcode)
- logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s")
-
- if not model:
- logger.info("未使用预训练模型,采用实时评估")
- results = [assess_with_realtime(fetcher, request.windcode, row['engine_code'],
- request.month,
- get_mill_type(fetcher, row['mill_type_code']),
- row['engine_name'])
- for _, row in turbines.iterrows()]
- else:
- # 批量获取所有风机数据
- data_fetch_start = time.time()
- all_features = list({f for feats in model.features.values() for f in feats})
- all_data = fetcher.fetch_all_turbines_data(
- request.windcode,
- request.month,
- all_features
- )
- logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s")
- logger.info(f"获取到 {len(all_data)} 台风机数据")
-
- # 并行评估
- assess_start = time.time()
- results = []
- with concurrent.futures.ThreadPoolExecutor() as executor:
- futures = []
- for _, row in turbines.iterrows():
- engine_code = row['engine_code']
- if engine_code not in all_data:
- empty_assessment = create_empty_assessment(
- engine_code,
- row['engine_name'],
- get_mill_type(fetcher, row['mill_type_code'])
- )
- futures.append(executor.submit(
- lambda: (_format_result(empty_assessment)))
- )
- continue
-
- futures.append(executor.submit(
- _assess_single_turbine,
- model, all_data[engine_code], row, fetcher
- ))
-
- for future in concurrent.futures.as_completed(futures):
- try:
- result = future.result()
- results.append(result)
- except Exception as e:
- logger.error(f"评估失败: {str(e)}")
- continue
-
- logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s")
- # 3. 将结果存入缓存(异步执行,不阻塞请求)
- if results:
- try:
-
- # 转换为可序列化的字典格式
- cache_data = [r.dict() for r in results if r is not None]
- CACHE_SERVICE.set_cached_response(
- request_body,
- cache_data,
- 3600 # 缓存1小时
- )
- except Exception as e:
- logger.error(f"缓存设置失败: {str(e)}")
-
- logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s")
- return results
-
- except Exception as e:
- logger.error(f"评估流程异常: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail="内部服务器错误")
-
- def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name):
- """实时评估逻辑(添加耗时统计)"""
- start_time = time.time()
-
- # 获取可用特征列
- col_start = time.time()
- available_columns = fetcher.get_turbine_columns(windcode)
- logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s")
-
- if not available_columns:
- return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
-
- # 获取有效特征
- feat_start = time.time()
- valid_features = REALTIME_ASSESSOR._get_all_possible_features(
- REALTIME_ASSESSOR, mill_type, available_columns
- )
- logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s")
-
- # 获取数据
- data_start = time.time()
- data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features)
- logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
-
- if data is None or data.empty:
- return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
-
- # 执行评估
- assess_start = time.time()
- assessment = REALTIME_ASSESSOR.assess_turbine(
- engine_code, data, mill_type, wind_turbine_name
- )
- logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s")
-
- logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s")
- return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type))
- def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type):
- """使用预训练模型评估"""
- start_time = time.time()
- model = PRETRAIN_MODELS.get(windcode)
- if not model:
- logger.warning(f"未找到风场 {windcode} 的预训练模型")
- return None
-
- # 检查机型是否匹配
- if model.mill_type != mill_type:
- logger.warning(
- f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. "
- f"风机 {engine_code} 将使用实时计算"
- )
- return None
- feat_start = time.time()
- # 获取模型特征(确保不重复且存在)
- features = list({
- f for subsys_feats in model.features.values()
- for f in subsys_feats
- if f in fetcher.get_turbine_columns(windcode) # 确保特征在数据库中存在
- })
- logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s")
-
- if not features:
- logger.warning(f"风场模型 {windcode} 无可用特征")
- return None
-
- # 获取数据
- data_start = time.time()
- data = fetcher.fetch_turbine_data(windcode, engine_code, month, features)
- logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
- if data is None or data.empty:
- logger.warning(f"风机 {engine_code} 无有效数据")
- return None
-
- # 执行评估
- assess_start = time.time()
- try:
- assessment = model.assess(data, engine_code)
- logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s")
-
- if not assessment:
- logger.warning(f"风场模型评估失败 {engine_code}")
- return None
-
- assessment.update({
- 'engine_code': engine_code,
- 'wind_turbine_name': wind_turbine_name,
- 'mill_type': mill_type
- })
- logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s")
- return clean_nans(assessment)
- except Exception as e:
- logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True)
- return None
- def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame,
- turbine_info: dict, fetcher: DataFetcher):
- """单个风机评估"""
- start_time = time.time()
- try:
- # 获取机型
- mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
- mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
-
- # 评估
- assess_start = time.time()
- assessment = model.assess(data, turbine_info['engine_code'])
- logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s")
-
- assessment.update({
- 'engine_code': turbine_info['engine_code'],
- 'wind_turbine_name': turbine_info['engine_name'],
- 'mill_type': mill_type
- })
- logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s")
- return _format_result(clean_nans(assessment), "pretrained")
- except Exception as e:
- logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}")
- empty_assessment = create_empty_assessment(
- turbine_info['engine_code'],
- turbine_info['engine_name'],
- mill_type
- )
- return _format_result(empty_assessment)
- def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str:
- """获取机型字符串"""
- mill_type_num = fetcher.get_mill_type(mill_type_code)
- return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
- def clean_nans(obj):
- """递归清理NaN值"""
- if isinstance(obj, dict):
- return {k: clean_nans(v) for k, v in obj.items()}
- elif isinstance(obj, list):
- return [clean_nans(item) for item in obj]
- elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)):
- return -1.0
- else:
- return obj
- def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict:
- """创建空评估结果"""
- return {
- 'engine_code': engine_code,
- 'wind_turbine_name': wind_turbine_name,
- 'mill_type': mill_type,
- 'total_health_score': -1,
- 'subsystems': {
- 'generator': {'health_score': -1, 'weights': {}, 'message': None},
- 'nacelle': {'health_score': -1, 'weights': {}, 'message': None},
- 'grid': {'health_score': -1, 'weights': {}, 'message': None},
- 'drive_train': {'health_score': -1, 'weights': {}, 'message': None}
- },
- 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
- }
- def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult:
- """格式化评估结果"""
- # 确保所有子系统都存在,即使评分为-1
- subsystems = assessment.get('subsystems', {})
- for subsys in ['generator', 'nacelle', 'grid', 'drive_train']:
- if subsys not in subsystems:
- subsystems[subsys] = {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- }
- return AssessmentResult(
- engine_code=assessment['engine_code'],
- wind_turbine_name=assessment['wind_turbine_name'],
- mill_type=assessment['mill_type'],
- total_health_score=assessment.get('total_health_score', -1),
- subsystems={
- k: SubsystemResult(
- health_score=v['health_score'],
- weights=v.get('weights', {}),
- message=v.get('message')
- ) for k, v in subsystems.items()
- },
- assessed_subsystems=assessment.get('assessed_subsystems', []),
- model_type=model_type
- )
|