HealthAssessor.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. import numpy as np
  2. import pandas as pd
  3. from sklearn.neighbors import BallTree
  4. from typing import Dict, List
  5. from functools import lru_cache
  6. import hashlib
  7. import json
  8. import redis
  9. class HealthAssessor:
  10. def __init__(self):
  11. self.subsystem_config = {
  12. # 发电机
  13. 'generator': {
  14. # 双馈
  15. 'dfig': {
  16. 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
  17. 'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
  18. 'generatornon_drive_end_bearing_temperature'],
  19. },
  20. # 直驱
  21. 'direct': {
  22. 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
  23. 'generator_winding3_temperature', 'main_bearing_temperature'],
  24. }
  25. },
  26. # 机舱系统
  27. 'nacelle': {
  28. 'fixed': ['front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin',
  29. 'cabin_position', 'cabin_temperature'],
  30. },
  31. # 电网环境
  32. 'grid': {
  33. 'fixed': ['reactive_power', 'active_power', 'grid_a_phase_current',
  34. 'grid_b_phase_current', 'grid_c_phase_current'],
  35. },
  36. # 传动系统
  37. 'drive_train': {
  38. 'fixed': ['main_bearing_temperature'],
  39. 'keywords': [
  40. {'include': ['gearbox', 'temperature'], 'exclude': [], 'min_count': 2},
  41. ]
  42. }
  43. }
  44. # 嵌入源代码的MSET实现
  45. self.mset = self._create_mset_core()
  46. def _create_mset_core(self):
  47. """创建MSET核心计算模块"""
  48. class MSETCore:
  49. def __init__(self):
  50. self.matrixD = None
  51. self.normalDataBallTree = None
  52. self.healthyResidual = None
  53. def calcSimilarity(self, x, y):
  54. """优化后的相似度计算"""
  55. diff = np.array(x) - np.array(y)
  56. return 1 / (1 + np.sqrt(np.sum(diff ** 2)))
  57. def genDLMatrix(self, trainDataset, dataSize4D=15, dataSize4L=5):
  58. """优化矩阵生成过程"""
  59. m, n = trainDataset.shape
  60. # 快速选择极值点
  61. min_indices = np.argmin(trainDataset, axis=0)
  62. max_indices = np.argmax(trainDataset, axis=0)
  63. unique_indices = np.unique(np.concatenate([min_indices, max_indices]))
  64. self.matrixD = trainDataset[unique_indices].copy()
  65. # 快速填充剩余点
  66. remaining_indices = np.setdiff1d(np.arange(m), unique_indices)
  67. np.random.shuffle(remaining_indices)
  68. needed = max(0, dataSize4D - len(unique_indices))
  69. if needed > 0:
  70. self.matrixD = np.vstack([self.matrixD, trainDataset[remaining_indices[:needed]]])
  71. # 使用与源代码一致的BallTree参数
  72. self.normalDataBallTree = BallTree(
  73. self.matrixD,
  74. leaf_size=40,
  75. metric=lambda i, j: 1 - self.calcSimilarity(i, j) # 自定义相似度
  76. )
  77. # 使用所有数据计算残差
  78. self.healthyResidual = self.calcResidualByLocallyWeightedLR(trainDataset)
  79. return 0
  80. def calcResidualByLocallyWeightedLR(self, newStates):
  81. """优化残差计算"""
  82. if len(newStates.shape) == 1:
  83. newStates = newStates.reshape(-1, 1)
  84. dist, iList = self.normalDataBallTree.query(
  85. newStates,
  86. k=min(10, len(self.matrixD)),
  87. return_distance=True
  88. )
  89. weights = 1 / (dist + 1e-5)
  90. weights /= weights.sum(axis=1)[:, np.newaxis]
  91. est_X = np.sum(weights[:, :, np.newaxis] * self.matrixD[iList[0]], axis=1)
  92. return est_X - newStates
  93. def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1):
  94. """优化SPRT计算"""
  95. stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
  96. weightedStateResidual = np.dot(stateResidual, feature_weight)
  97. weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
  98. mu0 = np.mean(weightedHealthyResidual)
  99. sigma0 = np.std(weightedHealthyResidual)
  100. # 向量化计算
  101. n = len(newsStates)
  102. if n < decisionGroup:
  103. return [50] # 中性值
  104. rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup) / decisionGroup, 'valid')
  105. si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2 * mu0) / (2 * sigma0 ** 2)
  106. lowThres = np.log(beta / (1 - alpha))
  107. highThres = np.log((1 - beta) / alpha)
  108. si = np.clip(si, lowThres, highThres)
  109. si = np.where(si > 0, si / highThres, si / lowThres)
  110. flag = 100 - si * 100
  111. # 填充不足的部分
  112. if len(flag) < n:
  113. flag = np.pad(flag, (0, n - len(flag)), mode='edge')
  114. return flag.tolist()
  115. def CRITIC_prepare(self, data, flag=1):
  116. """标准化处理"""
  117. data = data.astype(float)
  118. numeric_cols = data.select_dtypes(include=[np.number]).columns
  119. negative_cols = [col for col in numeric_cols
  120. if any(kw in col for kw in ['temperature'])]
  121. positive_cols = list(set(numeric_cols) - set(negative_cols))
  122. # 负向标准化
  123. if negative_cols:
  124. max_val = data[negative_cols].max()
  125. min_val = data[negative_cols].min()
  126. data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5)
  127. # 正向标准化
  128. if positive_cols:
  129. max_val = data[positive_cols].max()
  130. min_val = data[positive_cols].min()
  131. data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5)
  132. return data
  133. def CRITIC(self, data):
  134. """CRITIC权重计算(支持单特征)"""
  135. try:
  136. # 处理单特征情况
  137. if len(data.columns) == 1:
  138. return pd.Series([1.0], index=data.columns)
  139. data_norm = self.CRITIC_prepare(data.copy())
  140. std = data_norm.std(ddof=0).clip(lower=0.01)
  141. # 计算相关系数矩阵(添加异常处理)
  142. try:
  143. corr = np.abs(np.corrcoef(data_norm.T))
  144. np.fill_diagonal(corr, 0)
  145. conflict = np.sum(1 - corr, axis=1)
  146. except:
  147. # 如果计算相关系数失败,使用等权重
  148. return pd.Series(np.ones(len(data.columns))/len(data.columns))
  149. info = std * conflict
  150. weights = info / info.sum()
  151. return pd.Series(weights, index=data.columns)
  152. except Exception as e:
  153. print(f"CRITIC计算失败: {str(e)}")
  154. return pd.Series(np.ones(len(data.columns))/len(data.columns))
  155. def ahp(self, matrix):
  156. """AHP权重计算"""
  157. eigenvalue, eigenvector = np.linalg.eig(matrix)
  158. max_idx = np.argmax(eigenvalue)
  159. weight = eigenvector[:, max_idx].real
  160. return weight / weight.sum(), eigenvalue[max_idx].real
  161. return MSETCore()
  162. def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name):
  163. # 生成缓存键
  164. cache_key = f"assessment:{engine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}"
  165. try:
  166. # 尝试从缓存获取
  167. cached_result = self.cache_client.get(cache_key)
  168. if cached_result:
  169. return json.loads(cached_result)
  170. """评估单个风机"""
  171. results = {
  172. "engine_code": engine_code,
  173. "wind_turbine_name": wind_turbine_name,
  174. "mill_type": mill_type,
  175. "total_health_score": None,
  176. "subsystems": {},
  177. "assessed_subsystems": []
  178. }
  179. # 各子系统评估
  180. subsystems_to_assess = [
  181. ('generator', self.subsystem_config['generator'][mill_type], 1),
  182. ('nacelle', self.subsystem_config['nacelle'], 1),
  183. ('grid', self.subsystem_config['grid'], 1),
  184. ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None, 1)
  185. ]
  186. for subsystem, config, min_features in subsystems_to_assess:
  187. if config is None:
  188. continue
  189. features = self._get_subsystem_features(config, data)
  190. # 功能1:无论特征数量是否足够都输出结果
  191. if len(features) >= min_features:
  192. assessment = self._assess_subsystem(data[features])
  193. else:
  194. assessment = {
  195. 'health_score': -1, # 特征不足时输出'-'
  196. 'weights': {},
  197. 'message': f'Insufficient features (required {min_features}, got {len(features)})'
  198. }
  199. print('结果打印',assessment)
  200. # 功能3:删除features内容
  201. if 'features' in assessment:
  202. del assessment['features']
  203. # 最终清理:确保没有NaN值
  204. for sys, result in results["subsystems"].items():
  205. if isinstance(result['health_score'], float) and np.isnan(result['health_score']):
  206. result['health_score'] = -1
  207. result['message'] = (result.get('message') or '') + '; NaN detected'
  208. if isinstance(results["total_health_score"], float) and np.isnan(results["total_health_score"]):
  209. results["total_health_score"] = -1
  210. results["subsystems"][subsystem] = assessment
  211. # 计算整机健康度(使用新字段名)
  212. if results["subsystems"]:
  213. # 只计算健康值为数字的子系统
  214. valid_subsystems = [
  215. k for k, v in results["subsystems"].items()
  216. if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
  217. ]
  218. if valid_subsystems:
  219. weights = self._get_subsystem_weights(valid_subsystems)
  220. health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
  221. results["total_health_score"] = float(np.dot(health_scores, weights))
  222. results["assessed_subsystems"] = valid_subsystems
  223. return results
  224. except Exception as e:
  225. print('评估出错')
  226. def _get_all_possible_features(self, mill_type, available_columns):
  227. """
  228. 获取所有可能的特征列(基于实际存在的列)
  229. 参数:
  230. mill_type: 机型类型
  231. available_columns: 数据库实际存在的列名列表
  232. """
  233. features = []
  234. available_columns_lower = [col.lower() for col in available_columns] # 不区分大小写匹配
  235. # for subsys_name, subsys_config in assessor.subsystem_config.items():
  236. for subsys_name, subsys_config in self.subsystem_config.items():
  237. # 处理子系统配置
  238. if subsys_name == 'generator':
  239. config = subsys_config.get(mill_type, {})
  240. elif subsys_name == 'drive_train' and mill_type != 'dfig':
  241. continue
  242. else:
  243. config = subsys_config
  244. # 处理固定特征
  245. if 'fixed' in config:
  246. for f in config['fixed']:
  247. if f in available_columns:
  248. features.append(f)
  249. # 处理关键词特征
  250. if 'keywords' in config:
  251. for rule in config['keywords']:
  252. matched = []
  253. include_kws = [kw.lower() for kw in rule['include']]
  254. exclude_kws = [ex.lower() for ex in rule.get('exclude', [])]
  255. for col in available_columns:
  256. col_lower = col.lower()
  257. # 检查包含关键词
  258. include_ok = all(kw in col_lower for kw in include_kws)
  259. # 检查排除关键词
  260. exclude_ok = not any(ex in col_lower for ex in exclude_kws)
  261. if include_ok and exclude_ok:
  262. matched.append(col)
  263. if len(matched) >= rule.get('min_count', 1):
  264. features.extend(matched)
  265. return list(set(features)) # 去重
  266. def _get_subsystem_features(self, config: Dict, data: pd.DataFrame) -> List[str]:
  267. """最终版特征获取方法"""
  268. available_features = []
  269. # 固定特征检查(要求至少10%非空)
  270. if 'fixed' in config:
  271. for f in config['fixed']:
  272. if f in data.columns and data[f].notna().mean() > 0.1:
  273. available_features.append(f)
  274. print(f"匹配到的固定特征: {available_features}")
  275. # 关键词特征检查
  276. if 'keywords' in config:
  277. for rule in config['keywords']:
  278. matched = [
  279. col for col in data.columns
  280. if all(kw.lower() in col.lower() for kw in rule['include'])
  281. and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
  282. and data[col].notna().mean() > 0.1 # 数据有效性检查
  283. ]
  284. if len(matched) >= rule.get('min_count', 1):
  285. available_features.extend(matched)
  286. print(f"匹配到的关键词特征: {available_features}")
  287. return list(set(available_features))
  288. def _assess_subsystem(self, data: pd.DataFrame) -> Dict:
  289. """评估子系统(支持单特征)"""
  290. # 数据清洗
  291. clean_data = data.dropna()
  292. if len(clean_data) < 10: # 降低最小样本量要求(原为20)
  293. return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'}
  294. try:
  295. # 标准化
  296. normalized_data = self.mset.CRITIC_prepare(clean_data)
  297. # 计算权重 - 处理单特征情况
  298. if len(normalized_data.columns) == 1:
  299. weights = pd.Series([1.0], index=normalized_data.columns)
  300. else:
  301. weights = self.mset.CRITIC(normalized_data)
  302. # MSET评估
  303. health_score = self._run_mset_assessment(normalized_data.values, weights.values)
  304. bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
  305. adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
  306. def adjust_score(score):
  307. for i in range(len(bins)):
  308. if score < bins[i]:
  309. return score + adjust_values[i-1]
  310. return score #
  311. adjusted_score = adjust_score(health_score) #
  312. if adjusted_score >= 100:
  313. adjusted_score = 92.8
  314. return {
  315. 'health_score': float(health_score),
  316. 'weights': weights.to_dict(),
  317. 'features': list(data.columns)
  318. }
  319. except Exception as e:
  320. return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)}
  321. @lru_cache(maxsize=10)
  322. def _get_mset_model(self, train_data: tuple):
  323. """缓存MSET模型"""
  324. # 注意:由于lru_cache需要可哈希参数,这里使用元组
  325. arr = np.array(train_data)
  326. model = self._create_mset_core()
  327. model.genDLMatrix(arr)
  328. return model
  329. def _run_mset_assessment(self, data: np.ndarray, weights: np.ndarray) -> float:
  330. """执行MSET评估"""
  331. # 检查权重有效性
  332. if np.isnan(weights).any() or np.isinf(weights).any():
  333. weights = np.ones_like(weights) / len(weights) # 重置为等权重
  334. # 分割训练集和测试集
  335. split_idx = len(data) // 2
  336. train_data = data[:split_idx]
  337. test_data = data[split_idx:]
  338. # 使用缓存模型
  339. try:
  340. model = self._get_mset_model(tuple(map(tuple, train_data)))
  341. flags = model.calcSPRT(test_data, weights)
  342. # 过滤NaN值并计算均值
  343. valid_flags = [x for x in flags if not np.isnan(x)]
  344. if not valid_flags:
  345. return 50.0 # 默认中性值
  346. return float(np.mean(valid_flags))
  347. except Exception as e:
  348. print(f"MSET评估失败: {str(e)}")
  349. return 50.0 # 默认中性值
  350. def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
  351. """生成等权重的子系统权重向量"""
  352. n = len(subsystems)
  353. if n == 0:
  354. return np.array([])
  355. # 直接返回等权重向量
  356. return np.ones(n) / n