|
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- from datetime import datetime
- from typing import List, Dict, Optional
- import logging
- import os
- import glob
- import joblib
- import math
- import numpy as np
- import pandas as pd
- app = FastAPI(root_path="/api/health")
- import time
- from health_pretrain import WindFarmPretrainModel
- from database import DataFetcher
- from health_evalution_class import HealthAssessor
- import pandas as pd
- import concurrent.futures
- # 配置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # 请求模型定义
- class AssessmentRequest(BaseModel):
- windcode: str
- month: str # 格式: "YYYY-MM"
- class SubsystemResult(BaseModel):
- health_score: float
- weights: Dict[str, float]
- message: Optional[str] = None
- class AssessmentResult(BaseModel):
- engine_code: str
- wind_turbine_name: str
- mill_type: str
- total_health_score: Optional[float]
- subsystems: Dict[str, SubsystemResult]
- assessed_subsystems: List[str]
- model_type: Optional[str] = None # 新增字段,不影响原有结构
- # 全局存储
- PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {} # 改为存储风场级模型
- REALTIME_ASSESSOR = HealthAssessor()
- @app.on_event("startup")
- def load_resources():
- """加载预训练模型和其他资源"""
- # 加载预训练模型
- model_root = "health_models"
- if os.path.exists(model_root):
- for wind_code in os.listdir(model_root):
- wind_dir = os.path.join(model_root, wind_code)
- if os.path.isdir(wind_dir):
- try:
- model = WindFarmPretrainModel.load(wind_dir, wind_code)
- if model:
- PRETRAIN_MODELS[wind_code] = model
- except Exception as e:
- logger.error(f"加载风场模型失败 {wind_code}: {str(e)}")
-
- logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型")
- def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult:
- """格式化评估结果(完全兼容原有逻辑)"""
- # 确保所有子系统都存在,即使评分为-1
- subsystems = assessment.get('subsystems', {})
- for subsys in ['generator', 'nacelle', 'grid', 'drive_train']:
- if subsys not in subsystems:
- subsystems[subsys] = {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- }
-
- return AssessmentResult(
- engine_code=assessment['engine_code'],
- wind_turbine_name=assessment['wind_turbine_name'],
- mill_type=assessment['mill_type'],
- total_health_score=assessment.get('total_health_score', -1),
- subsystems={
- k: SubsystemResult(
- health_score=v['health_score'],
- weights=v.get('weights', {}),
- message=v.get('message')
- ) for k, v in subsystems.items()
- },
- assessed_subsystems=assessment.get('assessed_subsystems', []),
- model_type=model_type
- )
- def clean_nans(obj):
- """递归清理NaN值(保留原有逻辑)"""
- if isinstance(obj, dict):
- return {k: clean_nans(v) for k, v in obj.items()}
- elif isinstance(obj, list):
- return [clean_nans(item) for item in obj]
- elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)):
- return -1.0
- else:
- return obj
- def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict:
- """创建空评估结果(完全保留原有逻辑)"""
- return {
- 'engine_code': engine_code,
- 'wind_turbine_name': wind_turbine_name,
- 'mill_type': mill_type,
- 'total_health_score': -1,
- 'subsystems': {
- 'generator': {'health_score': -1, 'weights': {}, 'message': None},
- 'nacelle': {'health_score': -1, 'weights': {}, 'message': None},
- 'grid': {'health_score': -1, 'weights': {}, 'message': None},
- 'drive_train': {'health_score': -1, 'weights': {}, 'message': None}
- },
- 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
- }
- def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name):
- """实时评估逻辑(添加耗时统计)"""
- start_time = time.time()
-
- # 获取可用特征列
- col_start = time.time()
- available_columns = fetcher.get_turbine_columns(windcode)
- logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s")
-
- if not available_columns:
- return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
-
- # 获取有效特征
- feat_start = time.time()
- valid_features = REALTIME_ASSESSOR._get_all_possible_features(
- REALTIME_ASSESSOR, mill_type, available_columns
- )
- logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s")
-
- # 获取数据
- data_start = time.time()
- data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features)
- logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
-
- if data is None or data.empty:
- return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
-
- # 执行评估
- assess_start = time.time()
- assessment = REALTIME_ASSESSOR.assess_turbine(
- engine_code, data, mill_type, wind_turbine_name
- )
- logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s")
-
- logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s")
- return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type))
- def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type):
- """使用预训练模型评估"""
- start_time = time.time()
- model = PRETRAIN_MODELS.get(windcode)
- if not model:
- logger.warning(f"未找到风场 {windcode} 的预训练模型")
- return None
-
- # 检查机型是否匹配
- if model.mill_type != mill_type:
- logger.warning(
- f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. "
- f"风机 {engine_code} 将使用实时计算"
- )
- return None
- feat_start = time.time()
- # 获取模型特征(确保不重复且存在)
- features = list({
- f for subsys_feats in model.features.values()
- for f in subsys_feats
- if f in fetcher.get_turbine_columns(windcode) # 确保特征在数据库中存在
- })
- logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s")
-
- if not features:
- logger.warning(f"风场模型 {windcode} 无可用特征")
- return None
-
- # 获取数据
- data_start = time.time()
- data = fetcher.fetch_turbine_data(windcode, engine_code, month, features)
- logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
- if data is None or data.empty:
- logger.warning(f"风机 {engine_code} 无有效数据")
- return None
-
- # 执行评估
- assess_start = time.time()
- try:
- assessment = model.assess(data, engine_code)
- logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s")
-
- if not assessment:
- logger.warning(f"风场模型评估失败 {engine_code}")
- return None
-
- assessment.update({
- 'engine_code': engine_code,
- 'wind_turbine_name': wind_turbine_name,
- 'mill_type': mill_type
- })
- logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s")
- return clean_nans(assessment)
- except Exception as e:
- logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True)
- return None
- @app.post("/health_assess", response_model=List[AssessmentResult])
- async def assess_windfarm_optimized(request: AssessmentRequest):
- """优化后的评估流程(添加耗时统计)"""
- total_start = time.time()
- logger.info(f"开始评估风场 {request.windcode} {request.month}")
-
- try:
- fetcher_start = time.time()
- fetcher = DataFetcher()
- logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s")
-
- # 1. 获取风场所有风机信息
- turbine_start = time.time()
- turbines = fetcher.get_turbines(request.windcode)
- logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s")
-
- if turbines.empty:
- return []
-
- # 2. 检查预训练模型
- model_check_start = time.time()
- model = PRETRAIN_MODELS.get(request.windcode)
- logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s")
-
- if not model:
- logger.info("未使用预训练模型,采用实时评估")
- return [assess_with_realtime(fetcher, request.windcode, row['engine_code'],
- request.month,
- get_mill_type(fetcher, row['mill_type_code']),
- row['engine_name'])
- for _, row in turbines.iterrows()]
-
- # 3. 批量获取所有风机数据
- data_fetch_start = time.time()
- all_features = list({f for feats in model.features.values() for f in feats})
- all_data = fetcher.fetch_all_turbines_data(
- request.windcode,
- request.month,
- all_features
- )
- logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s")
- logger.info(f"获取到 {len(all_data)} 台风机数据")
-
- # 4. 并行评估
- assess_start = time.time()
- results = []
- with concurrent.futures.ThreadPoolExecutor() as executor:
- futures = []
- for _, row in turbines.iterrows():
- engine_code = row['engine_code']
- if engine_code not in all_data:
- empty_assessment = create_empty_assessment(
- engine_code,
- row['engine_name'],
- get_mill_type(fetcher, row['mill_type_code'])
- )
- futures.append(executor.submit(
- lambda: (_format_result(empty_assessment))
- ))
- continue
-
- futures.append(executor.submit(
- _assess_single_turbine,
- model, all_data[engine_code], row, fetcher
- ))
-
- for future in concurrent.futures.as_completed(futures):
- try:
- result = future.result()
- results.append(result)
- except Exception as e:
- logger.error(f"评估失败: {str(e)}")
- continue
-
- logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s")
- logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s")
- return results
- except Exception as e:
- logger.error(f"评估流程异常: {str(e)}")
- raise HTTPException(status_code=500, detail="内部服务器错误")
- def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame,
- turbine_info: dict, fetcher: DataFetcher):
- """单个风机评估(添加耗时统计)"""
- start_time = time.time()
- try:
- # 获取机型
- mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
- mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
-
- # 评估
- assess_start = time.time()
- assessment = model.assess(data, turbine_info['engine_code'])
- logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s")
-
- assessment.update({
- 'engine_code': turbine_info['engine_code'],
- 'wind_turbine_name': turbine_info['engine_name'],
- 'mill_type': mill_type
- })
- logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s")
- return _format_result(clean_nans(assessment), "pretrained")
- except Exception as e:
- logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}")
- empty_assessment = create_empty_assessment(
- turbine_info['engine_code'],
- turbine_info['engine_name'],
- mill_type
- )
- return _format_result(empty_assessment)
- def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str:
- """获取机型字符串"""
- mill_type_num = fetcher.get_mill_type(mill_type_code)
- return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
|