from fastapi import FastAPI, HTTPException from pydantic import BaseModel from datetime import datetime from typing import List, Dict, Optional import logging from database import DataFetcher from health_evalution_class import HealthAssessor import pandas as pd app = FastAPI() # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) #请求模型定义 class AssessmentRequest(BaseModel): windcode: str month: str # 格式: "YYYY-MM" class SubsystemResult(BaseModel): health_score: float weights: Dict[str, float] # features: List[str] message: Optional[str] = None class AssessmentResult(BaseModel): engine_code: str wind_turbine_name:str mill_type: str total_health_score: Optional[float] subsystems: Dict[str, SubsystemResult] assessed_subsystems: List[str] @app.post("/health_assess", response_model=List[AssessmentResult]) async def assess_windfarm(request: AssessmentRequest): try: datetime.strptime(request.month, "%Y-%m") except ValueError: raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式") logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}") fetcher = DataFetcher() assessor = HealthAssessor() turbines = fetcher.get_turbines(request.windcode) print(turbines) if turbines.empty: raise HTTPException(status_code=404, detail="未找到风场数据") results: List[AssessmentResult] = [] # 显式声明列表类型并初始化 all_turbines = len(turbines) processed_count = 0 for idx, row in turbines.iterrows(): try: engine_code = row['engine_code'] mill_type_code = row['mill_type_code'] wind_turbine_name = row['engine_name'] # 获取机型类型 mill_type_num = fetcher.get_mill_type(mill_type_code) mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown') if mill_type == 'unknown': logger.warning(f"{engine_code}机型未知,跳过处理") continue # 获取特征与数据 # 先获取该风场所有可用列名 available_columns = fetcher.get_turbine_columns(request.windcode) if not available_columns: continue # 获取有效特征列(基于实际列名) valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns) # 获取数据 data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features) print(data) if data is None or data.empty: # 增加对None的检查 logger.warning(f"{engine_code}在{request.month}无有效数据") # 创建空评估结果,所有评分为-1 empty_assessment = { 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type, 'total_health_score': -1, 'subsystems': { 'generator': { 'health_score': -1, 'weights': {}, 'message': None }, 'nacelle': { 'health_score': -1, 'weights': {}, 'message': None }, 'grid': { 'health_score': -1, 'weights': {}, 'message': None }, 'drive_train': { 'health_score': -1, 'weights': {}, 'message': None } }, 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train'] } formatted_result = _format_result(empty_assessment) results.append(formatted_result) processed_count += 1 logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)") continue # 执行评估并格式化结果 assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name) if not assessment: # 检查评估结果有效性 logger.warning(f"{engine_code}评估结果为空") continue formatted_result = _format_result(assessment) if isinstance(formatted_result, AssessmentResult): # 严格类型检查 results.append(formatted_result) processed_count += 1 logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)") else: logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}") except Exception as e: logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True) continue # 最终返回处理 - 强制转换为列表并确保类型正确 if not results: logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表") return [] # 防御性类型检查(生产环境可移除) if not isinstance(results, list): logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表") results = [results] if results is not None else [] return results def _format_result(assessment): """格式化评估结果""" return AssessmentResult( engine_code=assessment['engine_code'], wind_turbine_name=assessment['wind_turbine_name'], mill_type=assessment['mill_type'], total_health_score=assessment.get('total_health_score'), subsystems={ k: SubsystemResult(**v) for k, v in assessment['subsystems'].items() }, assessed_subsystems=assessment.get('assessed_subsystems', []) )