123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- from datetime import datetime
- from typing import List, Dict, Optional
- import logging
- from database import DataFetcher
- from health_evalution_class import HealthAssessor
- import pandas as pd
- app = FastAPI()
- # 配置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- #请求模型定义
- class AssessmentRequest(BaseModel):
- windcode: str
- month: str # 格式: "YYYY-MM"
- class SubsystemResult(BaseModel):
- health_score: float
- weights: Dict[str, float]
- # features: List[str]
- message: Optional[str] = None
- class AssessmentResult(BaseModel):
- engine_code: str
- wind_turbine_name:str
- mill_type: str
- total_health_score: Optional[float]
- subsystems: Dict[str, SubsystemResult]
- assessed_subsystems: List[str]
- @app.post("/health_assess", response_model=List[AssessmentResult])
- async def assess_windfarm(request: AssessmentRequest):
- try:
- datetime.strptime(request.month, "%Y-%m")
- except ValueError:
- raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式")
- logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}")
- fetcher = DataFetcher()
- assessor = HealthAssessor()
- turbines = fetcher.get_turbines(request.windcode)
- print(turbines)
- if turbines.empty:
- raise HTTPException(status_code=404, detail="未找到风场数据")
- results: List[AssessmentResult] = [] # 显式声明列表类型并初始化
- all_turbines = len(turbines)
- processed_count = 0
- for idx, row in turbines.iterrows():
- try:
- engine_code = row['engine_code']
- mill_type_code = row['mill_type_code']
- wind_turbine_name = row['engine_name']
- # 获取机型类型
- mill_type_num = fetcher.get_mill_type(mill_type_code)
- mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
- if mill_type == 'unknown':
- logger.warning(f"{engine_code}机型未知,跳过处理")
- continue
- # 获取特征与数据
- # 先获取该风场所有可用列名
- available_columns = fetcher.get_turbine_columns(request.windcode)
- if not available_columns:
- continue
- # 获取有效特征列(基于实际列名)
- valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns)
- # 获取数据
- data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features)
- print(data)
-
- if data is None or data.empty: # 增加对None的检查
- logger.warning(f"{engine_code}在{request.month}无有效数据")
- # 创建空评估结果,所有评分为-1
- empty_assessment = {
- 'engine_code': engine_code,
- 'wind_turbine_name': wind_turbine_name,
- 'mill_type': mill_type,
- 'total_health_score': -1,
- 'subsystems': {
- 'generator': {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- },
- 'nacelle': {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- },
- 'grid': {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- },
- 'drive_train': {
- 'health_score': -1,
- 'weights': {},
- 'message': None
- }
- },
- 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
- }
- formatted_result = _format_result(empty_assessment)
- results.append(formatted_result)
- processed_count += 1
- logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)")
- continue
- # 执行评估并格式化结果
- assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name)
- if not assessment: # 检查评估结果有效性
- logger.warning(f"{engine_code}评估结果为空")
- continue
-
- formatted_result = _format_result(assessment)
- if isinstance(formatted_result, AssessmentResult): # 严格类型检查
- results.append(formatted_result)
- processed_count += 1
- logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)")
- else:
- logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}")
-
- except Exception as e:
- logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True)
- continue
- # 最终返回处理 - 强制转换为列表并确保类型正确
- if not results:
- logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表")
- return []
-
- # 防御性类型检查(生产环境可移除)
- if not isinstance(results, list):
- logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表")
- results = [results] if results is not None else []
-
- return results
- def _format_result(assessment):
- """格式化评估结果"""
- return AssessmentResult(
- engine_code=assessment['engine_code'],
- wind_turbine_name=assessment['wind_turbine_name'],
- mill_type=assessment['mill_type'],
- total_health_score=assessment.get('total_health_score'),
- subsystems={
- k: SubsystemResult(**v)
- for k, v in assessment['subsystems'].items()
- },
- assessed_subsystems=assessment.get('assessed_subsystems', [])
- )
|