api_health.py 13 KB


  1. from fastapi import FastAPI, HTTPException
  2. from pydantic import BaseModel
  3. from datetime import datetime
  4. from typing import List, Dict, Optional
  5. import logging
  6. import os
  7. import glob
  8. import joblib
  9. import math
  10. import numpy as np
  11. import pandas as pd
  12. app = FastAPI(root_path="/api/health")
  13. import time
  14. from health_pretrain import WindFarmPretrainModel
  15. from database import DataFetcher
  16. from health_evalution_class import HealthAssessor
  17. import pandas as pd
  18. import concurrent.futures
  19. # 配置日志
  20. logging.basicConfig(level=logging.INFO)
  21. logger = logging.getLogger(__name__)
  22. # 请求模型定义
  23. class AssessmentRequest(BaseModel):
  24. windcode: str
  25. month: str # 格式: "YYYY-MM"
  26. class SubsystemResult(BaseModel):
  27. health_score: float
  28. weights: Dict[str, float]
  29. message: Optional[str] = None
  30. class AssessmentResult(BaseModel):
  31. engine_code: str
  32. wind_turbine_name: str
  33. mill_type: str
  34. total_health_score: Optional[float]
  35. subsystems: Dict[str, SubsystemResult]
  36. assessed_subsystems: List[str]
  37. model_type: Optional[str] = None # 新增字段,不影响原有结构
  38. # 全局存储
  39. PRETRAIN_MODELS: Dict[str, WindFarmPretrainModel] = {} # 改为存储风场级模型
  40. REALTIME_ASSESSOR = HealthAssessor()
  41. @app.on_event("startup")
  42. def load_resources():
  43. """加载预训练模型和其他资源"""
  44. # 加载预训练模型
  45. model_root = "health_models"
  46. if os.path.exists(model_root):
  47. for wind_code in os.listdir(model_root):
  48. wind_dir = os.path.join(model_root, wind_code)
  49. if os.path.isdir(wind_dir):
  50. try:
  51. model = WindFarmPretrainModel.load(wind_dir, wind_code)
  52. if model:
  53. PRETRAIN_MODELS[wind_code] = model
  54. except Exception as e:
  55. logger.error(f"加载风场模型失败 {wind_code}: {str(e)}")
  56. logger.info(f"预训练模型加载完成,共加载 {len(PRETRAIN_MODELS)} 个风场模型")
  57. def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult:
  58. """格式化评估结果(完全兼容原有逻辑)"""
  59. # 确保所有子系统都存在,即使评分为-1
  60. subsystems = assessment.get('subsystems', {})
  61. for subsys in ['generator', 'nacelle', 'grid', 'drive_train']:
  62. if subsys not in subsystems:
  63. subsystems[subsys] = {
  64. 'health_score': -1,
  65. 'weights': {},
  66. 'message': None
  67. }
  68. return AssessmentResult(
  69. engine_code=assessment['engine_code'],
  70. wind_turbine_name=assessment['wind_turbine_name'],
  71. mill_type=assessment['mill_type'],
  72. total_health_score=assessment.get('total_health_score', -1),
  73. subsystems={
  74. k: SubsystemResult(
  75. health_score=v['health_score'],
  76. weights=v.get('weights', {}),
  77. message=v.get('message')
  78. ) for k, v in subsystems.items()
  79. },
  80. assessed_subsystems=assessment.get('assessed_subsystems', []),
  81. model_type=model_type
  82. )
  83. def clean_nans(obj):
  84. """递归清理NaN值(保留原有逻辑)"""
  85. if isinstance(obj, dict):
  86. return {k: clean_nans(v) for k, v in obj.items()}
  87. elif isinstance(obj, list):
  88. return [clean_nans(item) for item in obj]
  89. elif isinstance(obj, float) and (math.isnan(obj) or not math.isfinite(obj)):
  90. return -1.0
  91. else:
  92. return obj
  93. def create_empty_assessment(engine_code: str, wind_turbine_name: str, mill_type: str) -> Dict:
  94. """创建空评估结果(完全保留原有逻辑)"""
  95. return {
  96. 'engine_code': engine_code,
  97. 'wind_turbine_name': wind_turbine_name,
  98. 'mill_type': mill_type,
  99. 'total_health_score': -1,
  100. 'subsystems': {
  101. 'generator': {'health_score': -1, 'weights': {}, 'message': None},
  102. 'nacelle': {'health_score': -1, 'weights': {}, 'message': None},
  103. 'grid': {'health_score': -1, 'weights': {}, 'message': None},
  104. 'drive_train': {'health_score': -1, 'weights': {}, 'message': None}
  105. },
  106. 'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
  107. }
  108. def assess_with_realtime(fetcher, windcode, engine_code, month, mill_type, wind_turbine_name):
  109. """实时评估逻辑(添加耗时统计)"""
  110. start_time = time.time()
  111. # 获取可用特征列
  112. col_start = time.time()
  113. available_columns = fetcher.get_turbine_columns(windcode)
  114. logger.info(f"[{engine_code}] 获取列名耗时: {time.time() - col_start:.2f}s")
  115. if not available_columns:
  116. return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
  117. # 获取有效特征
  118. feat_start = time.time()
  119. valid_features = REALTIME_ASSESSOR._get_all_possible_features(
  120. REALTIME_ASSESSOR, mill_type, available_columns
  121. )
  122. logger.info(f"[{engine_code}] 特征筛选耗时: {time.time() - feat_start:.2f}s")
  123. # 获取数据
  124. data_start = time.time()
  125. data = fetcher.fetch_turbine_data(windcode, engine_code, month, valid_features)
  126. logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
  127. if data is None or data.empty:
  128. return create_empty_assessment(engine_code, wind_turbine_name, mill_type)
  129. # 执行评估
  130. assess_start = time.time()
  131. assessment = REALTIME_ASSESSOR.assess_turbine(
  132. engine_code, data, mill_type, wind_turbine_name
  133. )
  134. logger.info(f"[{engine_code}] 模型评估耗时: {time.time() - assess_start:.2f}s")
  135. logger.info(f"[{engine_code}] 实时评估总耗时: {time.time() - start_time:.2f}s")
  136. return clean_nans(assessment or create_empty_assessment(engine_code, wind_turbine_name, mill_type))
  137. def assess_with_pretrained(fetcher, windcode, engine_code, month, wind_turbine_name, mill_type):
  138. """使用预训练模型评估"""
  139. start_time = time.time()
  140. model = PRETRAIN_MODELS.get(windcode)
  141. if not model:
  142. logger.warning(f"未找到风场 {windcode} 的预训练模型")
  143. return None
  144. # 检查机型是否匹配
  145. if model.mill_type != mill_type:
  146. logger.warning(
  147. f"机型不匹配: 模型机型={model.mill_type}, 当前机型={mill_type}. "
  148. f"风机 {engine_code} 将使用实时计算"
  149. )
  150. return None
  151. feat_start = time.time()
  152. # 获取模型特征(确保不重复且存在)
  153. features = list({
  154. f for subsys_feats in model.features.values()
  155. for f in subsys_feats
  156. if f in fetcher.get_turbine_columns(windcode) # 确保特征在数据库中存在
  157. })
  158. logger.info(f"[{engine_code}] 特征准备耗时: {time.time() - feat_start:.2f}s")
  159. if not features:
  160. logger.warning(f"风场模型 {windcode} 无可用特征")
  161. return None
  162. # 获取数据
  163. data_start = time.time()
  164. data = fetcher.fetch_turbine_data(windcode, engine_code, month, features)
  165. logger.info(f"[{engine_code}] 数据查询耗时: {time.time() - data_start:.2f}s")
  166. if data is None or data.empty:
  167. logger.warning(f"风机 {engine_code} 无有效数据")
  168. return None
  169. # 执行评估
  170. assess_start = time.time()
  171. try:
  172. assessment = model.assess(data, engine_code)
  173. logger.info(f"[{engine_code}] 模型推理耗时: {time.time() - assess_start:.2f}s")
  174. if not assessment:
  175. logger.warning(f"风场模型评估失败 {engine_code}")
  176. return None
  177. assessment.update({
  178. 'engine_code': engine_code,
  179. 'wind_turbine_name': wind_turbine_name,
  180. 'mill_type': mill_type
  181. })
  182. logger.info(f"[{engine_code}] 预训练评估总耗时: {time.time() - start_time:.2f}s")
  183. return clean_nans(assessment)
  184. except Exception as e:
  185. logger.error(f"预训练评估异常 {engine_code}: {str(e)}", exc_info=True)
  186. return None
  187. @app.post("/health_assess", response_model=List[AssessmentResult])
  188. async def assess_windfarm_optimized(request: AssessmentRequest):
  189. """优化后的评估流程(添加耗时统计)"""
  190. total_start = time.time()
  191. logger.info(f"开始评估风场 {request.windcode} {request.month}")
  192. try:
  193. fetcher_start = time.time()
  194. fetcher = DataFetcher()
  195. logger.info(f"初始化DataFetcher耗时: {time.time() - fetcher_start:.2f}s")
  196. # 1. 获取风场所有风机信息
  197. turbine_start = time.time()
  198. turbines = fetcher.get_turbines(request.windcode)
  199. logger.info(f"获取风机列表耗时: {time.time() - turbine_start:.2f}s")
  200. if turbines.empty:
  201. return []
  202. # 2. 检查预训练模型
  203. model_check_start = time.time()
  204. model = PRETRAIN_MODELS.get(request.windcode)
  205. logger.info(f"检查预训练模型耗时: {time.time() - model_check_start:.2f}s")
  206. if not model:
  207. logger.info("未使用预训练模型,采用实时评估")
  208. return [assess_with_realtime(fetcher, request.windcode, row['engine_code'],
  209. request.month,
  210. get_mill_type(fetcher, row['mill_type_code']),
  211. row['engine_name'])
  212. for _, row in turbines.iterrows()]
  213. # 3. 批量获取所有风机数据
  214. data_fetch_start = time.time()
  215. all_features = list({f for feats in model.features.values() for f in feats})
  216. all_data = fetcher.fetch_all_turbines_data(
  217. request.windcode,
  218. request.month,
  219. all_features
  220. )
  221. logger.info(f"批量数据查询耗时: {time.time() - data_fetch_start:.2f}s")
  222. logger.info(f"获取到 {len(all_data)} 台风机数据")
  223. # 4. 并行评估
  224. assess_start = time.time()
  225. results = []
  226. with concurrent.futures.ThreadPoolExecutor() as executor:
  227. futures = []
  228. for _, row in turbines.iterrows():
  229. engine_code = row['engine_code']
  230. if engine_code not in all_data:
  231. empty_assessment = create_empty_assessment(
  232. engine_code,
  233. row['engine_name'],
  234. get_mill_type(fetcher, row['mill_type_code'])
  235. )
  236. futures.append(executor.submit(
  237. lambda: (_format_result(empty_assessment))
  238. ))
  239. continue
  240. futures.append(executor.submit(
  241. _assess_single_turbine,
  242. model, all_data[engine_code], row, fetcher
  243. ))
  244. for future in concurrent.futures.as_completed(futures):
  245. try:
  246. result = future.result()
  247. results.append(result)
  248. except Exception as e:
  249. logger.error(f"评估失败: {str(e)}")
  250. continue
  251. logger.info(f"并行评估总耗时: {time.time() - assess_start:.2f}s")
  252. logger.info(f"风场评估总耗时: {time.time() - total_start:.2f}s")
  253. return results
  254. except Exception as e:
  255. logger.error(f"评估流程异常: {str(e)}")
  256. raise HTTPException(status_code=500, detail="内部服务器错误")
  257. def _assess_single_turbine(model: WindFarmPretrainModel, data: pd.DataFrame,
  258. turbine_info: dict, fetcher: DataFetcher):
  259. """单个风机评估(添加耗时统计)"""
  260. start_time = time.time()
  261. try:
  262. # 获取机型
  263. mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
  264. mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
  265. # 评估
  266. assess_start = time.time()
  267. assessment = model.assess(data, turbine_info['engine_code'])
  268. logger.info(f"[{turbine_info['engine_code']}] 模型推理耗时: {time.time() - assess_start:.2f}s")
  269. assessment.update({
  270. 'engine_code': turbine_info['engine_code'],
  271. 'wind_turbine_name': turbine_info['engine_name'],
  272. 'mill_type': mill_type
  273. })
  274. logger.info(f"[{turbine_info['engine_code']}] 单机评估耗时: {time.time() - start_time:.2f}s")
  275. return _format_result(clean_nans(assessment), "pretrained")
  276. except Exception as e:
  277. logger.error(f"评估风机 {turbine_info['engine_code']} 失败: {str(e)}")
  278. empty_assessment = create_empty_assessment(
  279. turbine_info['engine_code'],
  280. turbine_info['engine_name'],
  281. mill_type
  282. )
  283. return _format_result(empty_assessment)
  284. def get_mill_type(fetcher: DataFetcher, mill_type_code: str) -> str:
  285. """获取机型字符串"""
  286. mill_type_num = fetcher.get_mill_type(mill_type_code)
  287. return {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')