from datetime import datetime from typing import List from fastapi import HTTPException, APIRouter from app.logger import logger from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult from app.services.HealthAssessor import HealthAssessor from app.services.HealthDataFetcher import DataFetcher router = APIRouter() @router.post("/health_assess", response_model=List[AssessmentResult]) async def assess_windfarm(request: AssessmentRequest): try: datetime.strptime(request.month, "%Y-%m") except ValueError: raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式") logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}") fetcher = DataFetcher() assessor = HealthAssessor() turbines = fetcher.get_turbines(request.windcode) print(turbines) if turbines.empty: raise HTTPException(status_code=404, detail="未找到风场数据") results: List[AssessmentResult] = [] # 显式声明列表类型并初始化 all_turbines = len(turbines) processed_count = 0 for idx, row in turbines.iterrows(): try: engine_code = row['engine_code'] mill_type_code = row['mill_type_code'] wind_turbine_name = row['engine_name'] # 获取机型类型 mill_type_num = fetcher.get_mill_type(mill_type_code) mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown') if mill_type == 'unknown': logger.warning(f"{engine_code}机型未知,跳过处理") continue # 获取特征与数据 # 先获取该风场所有可用列名 available_columns = fetcher.get_turbine_columns(request.windcode) if not available_columns: continue # 获取有效特征列(基于实际列名) valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns) # 获取数据 data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features) print(data) if data is None or data.empty: # 增加对None的检查 logger.warning(f"{engine_code}在{request.month}无有效数据") # 创建空评估结果,所有评分为-1 empty_assessment = { 'engine_code': engine_code, 'wind_turbine_name': wind_turbine_name, 'mill_type': mill_type, 'total_health_score': -1, 'subsystems': { 'generator': { 'health_score': -1, 'weights': {}, 'message': None }, 'nacelle': { 'health_score': -1, 'weights': {}, 'message': None }, 'grid': { 'health_score': -1, 'weights': {}, 'message': None }, 'drive_train': { 'health_score': -1, 'weights': {}, 'message': None } }, 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train'] } formatted_result = _format_result(empty_assessment) results.append(formatted_result) processed_count += 1 logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)") continue # 执行评估并格式化结果 assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name) if not assessment: # 检查评估结果有效性 logger.warning(f"{engine_code}评估结果为空") continue formatted_result = _format_result(assessment) if isinstance(formatted_result, AssessmentResult): # 严格类型检查 results.append(formatted_result) processed_count += 1 logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)") else: logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}") except Exception as e: logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True) continue # 最终返回处理 - 强制转换为列表并确保类型正确 if not results: logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表") return [] # 防御性类型检查(生产环境可移除) if not isinstance(results, list): logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表") results = [results] if results is not None else [] return results def _format_result(assessment): """格式化评估结果""" return AssessmentResult( engine_code=assessment['engine_code'], wind_turbine_name=assessment['wind_turbine_name'], mill_type=assessment['mill_type'], total_health_score=assessment.get('total_health_score'), subsystems={ k: SubsystemResult(**v) for k, v in assessment['subsystems'].items() }, assessed_subsystems=assessment.get('assessed_subsystems', []) )