health.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import math
  2. from datetime import datetime
  3. from typing import List
  4. from fastapi import HTTPException, APIRouter
  5. from app.logger import logger
  6. from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult
  7. from app.services.HealthAssessor import HealthAssessor
  8. from app.services.HealthDataFetcher import DataFetcher
  9. router = APIRouter()
  10. @router.post("/health_assess", response_model=List[AssessmentResult])
  11. async def assess_windfarm(request: AssessmentRequest):
  12. try:
  13. datetime.strptime(request.month, "%Y-%m")
  14. except ValueError:
  15. raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式")
  16. logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}")
  17. fetcher = DataFetcher()
  18. assessor = HealthAssessor()
  19. turbines = fetcher.get_turbines(request.windcode)
  20. print(turbines)
  21. if turbines.empty:
  22. raise HTTPException(status_code=404, detail="未找到风场数据")
  23. results: List[AssessmentResult] = [] # 显式声明列表类型并初始化
  24. all_turbines = len(turbines)
  25. processed_count = 0
  26. for idx, row in turbines.iterrows():
  27. try:
  28. engine_code = row['engine_code']
  29. mill_type_code = row['mill_type_code']
  30. wind_turbine_name = row['engine_name']
  31. # 获取机型类型
  32. mill_type_num = fetcher.get_mill_type(mill_type_code)
  33. mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
  34. if mill_type == 'unknown':
  35. logger.warning(f"{engine_code}机型未知,跳过处理")
  36. continue
  37. # 获取特征与数据
  38. # 先获取该风场所有可用列名
  39. available_columns = fetcher.get_turbine_columns(request.windcode)
  40. if not available_columns:
  41. continue
  42. # 获取有效特征列(基于实际列名)
  43. valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns)
  44. # 获取数据
  45. data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features)
  46. print(data)
  47. if data is None or data.empty: # 增加对None的检查
  48. logger.warning(f"{engine_code}在{request.month}无有效数据")
  49. # 创建空评估结果,所有评分为-1
  50. empty_assessment = {
  51. 'engine_code': engine_code,
  52. 'wind_turbine_name': wind_turbine_name,
  53. 'mill_type': mill_type,
  54. 'total_health_score': -1,
  55. 'subsystems': {
  56. 'generator': {
  57. 'health_score': -1,
  58. 'weights': {},
  59. 'message': None
  60. },
  61. 'nacelle': {
  62. 'health_score': -1,
  63. 'weights': {},
  64. 'message': None
  65. },
  66. 'grid': {
  67. 'health_score': -1,
  68. 'weights': {},
  69. 'message': None
  70. },
  71. 'drive_train': {
  72. 'health_score': -1,
  73. 'weights': {},
  74. 'message': None
  75. }
  76. },
  77. 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
  78. }
  79. formatted_result = _format_result(empty_assessment)
  80. results.append(formatted_result)
  81. processed_count += 1
  82. logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)")
  83. continue
  84. # 执行评估并格式化结果
  85. assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name)
  86. if not assessment: # 检查评估结果有效性
  87. logger.warning(f"{engine_code}评估结果为空")
  88. continue
  89. formatted_result = _format_result(assessment)
  90. if isinstance(formatted_result, AssessmentResult): # 严格类型检查
  91. results.append(formatted_result)
  92. processed_count += 1
  93. logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)")
  94. else:
  95. logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}")
  96. except Exception as e:
  97. logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True)
  98. continue
  99. # 最终返回处理 - 强制转换为列表并确保类型正确
  100. if not results:
  101. logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表")
  102. return []
  103. cleaned_results = [clean_nans(result.dict()) for result in results]
  104. # 防御性类型检查(生产环境可移除)
  105. if not isinstance(cleaned_results, list):
  106. logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表")
  107. cleaned_results = [cleaned_results] if cleaned_results is not None else []
  108. return cleaned_results
  109. def _format_result(assessment):
  110. """格式化评估结果"""
  111. return AssessmentResult(
  112. engine_code=assessment['engine_code'],
  113. wind_turbine_name=assessment['wind_turbine_name'],
  114. mill_type=assessment['mill_type'],
  115. total_health_score=assessment.get('total_health_score'),
  116. subsystems={
  117. k: SubsystemResult(**v)
  118. for k, v in assessment['subsystems'].items()
  119. },
  120. assessed_subsystems=assessment.get('assessed_subsystems', [])
  121. )
  122. def clean_nans(obj):
  123. """递归清理字典和列表中的NaN值"""
  124. if isinstance(obj, dict):
  125. return {k: clean_nans(v) for k, v in obj.items()}
  126. elif isinstance(obj, list):
  127. return [clean_nans(item) for item in obj]
  128. elif isinstance(obj, float) and math.isnan(obj):
  129. return -1.0 # 替换为-1表示无效值
  130. elif isinstance(obj, float) and not math.isfinite(obj):
  131. return -1.0 # 处理无穷大值
  132. else:
  133. return obj