HealthPretrain.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import os
  2. import joblib
  3. import numpy as np
  4. import pandas as pd
  5. from sklearn.neighbors import BallTree
  6. from typing import Dict, Optional
  7. from app.services.HealthAssessor import HealthAssessor
  8. from app.logger import logger
  9. import hashlib
  10. import redis
  11. from app.services.HealthCacheService import CacheService
  12. class WindFarmPretrainModel:
  13. """整个风场的预训练模型"""
  14. def __init__(self, wind_code: str):
  15. self.wind_code = wind_code
  16. self.mill_type = None # 风场主要机型
  17. self.subsystem_models = {} # 各子系统模型
  18. self.features = {} # 各子系统使用的特征
  19. self.turbine_codes = [] # 包含的风机列表
  20. self.cache_client = CacheService(host='192.168.50.233', port=6379,db =10,password=123456,ttl=2592000 ) # 添加缓存客户端
  21. def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
  22. """训练风场模型(支持单特征子系统)"""
  23. self.mill_type = mill_type
  24. self.turbine_codes = list(data_dict.keys())
  25. assessor = HealthAssessor()
  26. # 合并所有风机数据用于训练
  27. all_data = pd.concat(data_dict.values())
  28. # 训练各子系统模型
  29. subsystems = {
  30. 'generator': assessor.subsystem_config['generator'][mill_type],
  31. 'nacelle': assessor.subsystem_config['nacelle'],
  32. 'grid': assessor.subsystem_config['grid'],
  33. 'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None
  34. }
  35. for subsys, config in subsystems.items():
  36. if config is None:
  37. continue
  38. # 获取子系统特征
  39. features = assessor._get_subsystem_features(config, all_data)
  40. logger.info('features',features)
  41. if not features:
  42. logger.warning(f"子系统 {subsys} 无有效特征")
  43. continue
  44. # 准备训练数据 - 降低样本量要求但至少需要100个样本
  45. train_data = all_data[features].dropna()
  46. if len(train_data) < 100: # 原为1000
  47. logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本")
  48. continue
  49. try:
  50. # 训练MSET模型
  51. mset = assessor._create_mset_core()
  52. if mset.genDLMatrix(train_data.values) != 0:
  53. continue
  54. # 计算权重 - 支持单特征
  55. normalized_data = mset.CRITIC_prepare(train_data)
  56. # 单特征直接赋权重1.0
  57. if len(normalized_data.columns) == 1:
  58. weights = pd.Series([1.0], index=normalized_data.columns)
  59. else:
  60. weights = mset.CRITIC(normalized_data)
  61. # 保存子系统模型
  62. self.subsystem_models[subsys] = {
  63. 'matrixD': mset.matrixD,
  64. 'healthyResidual': mset.healthyResidual,
  65. 'feature_weights': weights.to_dict()
  66. }
  67. self.features[subsys] = features
  68. except Exception as e:
  69. logger.error(f"子系统 {subsys} 训练失败: {str(e)}")
  70. continue
  71. def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict:
  72. # 生成缓存键
  73. cache_key = f"pretrain:{self.wind_code}:{turbine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}"
  74. """使用预训练模型进行评估(支持单特征子系统)"""
  75. try:
  76. if not self.subsystem_models:
  77. return {}
  78. results = {
  79. "engine_code": turbine_code,
  80. "subsystems": {},
  81. "assessed_subsystems": []
  82. }
  83. for subsys in self.subsystem_models.keys():
  84. if subsys not in self.features:
  85. continue
  86. features = [f for f in self.features[subsys] if f in data.columns]
  87. if not features:
  88. continue
  89. test_data = data[features].dropna()
  90. if len(test_data) < 5: # 降低最小样本量要求(原为10)
  91. continue
  92. try:
  93. # 确保权重有效
  94. weights_dict = self.subsystem_models[subsys]['feature_weights']
  95. weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features))
  96. # 初始化MSET模型(如果尚未初始化)
  97. if not hasattr(self, '_balltree_cache'):
  98. self._init_balltree_cache()
  99. mset = self._balltree_cache.get(subsys)
  100. if not mset:
  101. continue
  102. flags = mset.calcSPRT(test_data.values, weights.values)
  103. valid_flags = [x for x in flags if not np.isnan(x)]
  104. health_score = float(np.mean(valid_flags)) if valid_flags else 50.0
  105. results["subsystems"][subsys] = {
  106. "health_score": health_score,
  107. "weights": weights_dict
  108. }
  109. # logger.info(f"打印结果: {results['subsystems'][subsys]}")
  110. bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
  111. adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
  112. def adjust_score(score):
  113. for i in range(len(bins)):
  114. if score < bins[i]:
  115. return score + adjust_values[i-1]
  116. return score #
  117. adjusted_score = adjust_score(health_score) #
  118. if adjusted_score >= 100:
  119. adjusted_score = 92.8
  120. results["subsystems"][subsys] = {
  121. "health_score": adjusted_score,
  122. "weights": weights_dict
  123. }
  124. # logger.info(f"打印结果: {results['subsystems'][subsys]}")
  125. results["assessed_subsystems"].append(subsys)
  126. except Exception as e:
  127. logger.info(f"子系统 {subsys} 评估失败: {str(e)}")
  128. continue
  129. # 计算整机健康度
  130. if results["assessed_subsystems"]:
  131. scores = [results["subsystems"][s]["health_score"]
  132. for s in results["assessed_subsystems"]]
  133. weights = np.ones(len(scores)) / len(scores) # 子系统间使用等权重
  134. results["total_health_score"] = float(np.dot(scores, weights))
  135. return results
  136. except Exception as e:
  137. logger.error(f"预训练模型评估过程中出错: {str(e)}")
  138. return {}
  139. def _init_balltree_cache(self):
  140. """初始化BallTree缓存"""
  141. self._balltree_cache = {}
  142. assessor = HealthAssessor()
  143. for subsys, model in self.subsystem_models.items():
  144. try:
  145. mset = assessor._create_mset_core()
  146. mset.matrixD = model['matrixD']
  147. mset.healthyResidual = model['healthyResidual']
  148. mset.normalDataBallTree = BallTree(
  149. mset.matrixD,
  150. leaf_size=4,
  151. metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b)
  152. )
  153. self._balltree_cache[subsys] = mset
  154. except Exception as e:
  155. logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}")
  156. def save(self, model_dir: str):
  157. """保存模型到文件"""
  158. save_data = {
  159. "wind_code": self.wind_code,
  160. "mill_type": self.mill_type,
  161. "subsystem_models": self.subsystem_models,
  162. "features": self.features,
  163. "turbine_codes": self.turbine_codes
  164. }
  165. os.makedirs(model_dir, exist_ok=True)
  166. path = os.path.join(model_dir, f"{self.wind_code}.pkl")
  167. joblib.dump(save_data, path)
  168. @classmethod
  169. def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']:
  170. """从文件加载模型"""
  171. path = os.path.join(model_dir, f"{wind_code}.pkl")
  172. if not os.path.exists(path):
  173. return None
  174. data = joblib.load(path)
  175. model = cls(data["wind_code"])
  176. model.mill_type = data["mill_type"]
  177. model.subsystem_models = data["subsystem_models"]
  178. model.features = data["features"]
  179. model.turbine_codes = data.get("turbine_codes", [])
  180. return model