import numpy as np import pandas as pd from sklearn.neighbors import BallTree from typing import Dict, List from functools import lru_cache import hashlib import json import redis class HealthAssessor: def __init__(self): self.subsystem_config = { # 发电机 'generator': { # 双馈 'dfig': { 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature', 'generator_winding3_temperature', 'generatordrive_end_bearing_temperature', 'generatornon_drive_end_bearing_temperature'], }, # 直驱 'direct': { 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature', 'generator_winding3_temperature', 'main_bearing_temperature'], } }, # 机舱系统 'nacelle': { 'fixed': ['front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin', 'cabin_position', 'cabin_temperature'], }, # 电网环境 'grid': { 'fixed': ['reactive_power', 'active_power', 'grid_a_phase_current', 'grid_b_phase_current', 'grid_c_phase_current'], }, # 传动系统 'drive_train': { 'fixed': ['main_bearing_temperature'], 'keywords': [ {'include': ['gearbox', 'temperature'], 'exclude': [], 'min_count': 2}, ] } } # 嵌入源代码的MSET实现 self.mset = self._create_mset_core() def _create_mset_core(self): """创建MSET核心计算模块""" class MSETCore: def __init__(self): self.matrixD = None self.normalDataBallTree = None self.healthyResidual = None def calcSimilarity(self, x, y): """优化后的相似度计算""" diff = np.array(x) - np.array(y) return 1 / (1 + np.sqrt(np.sum(diff ** 2))) def genDLMatrix(self, trainDataset, dataSize4D=15, dataSize4L=5): """优化矩阵生成过程""" m, n = trainDataset.shape # 快速选择极值点 min_indices = np.argmin(trainDataset, axis=0) max_indices = np.argmax(trainDataset, axis=0) unique_indices = np.unique(np.concatenate([min_indices, max_indices])) self.matrixD = trainDataset[unique_indices].copy() # 快速填充剩余点 remaining_indices = np.setdiff1d(np.arange(m), unique_indices) np.random.shuffle(remaining_indices) needed = max(0, dataSize4D - len(unique_indices)) if needed > 0: self.matrixD = np.vstack([self.matrixD, trainDataset[remaining_indices[:needed]]]) # 使用与源代码一致的BallTree参数 self.normalDataBallTree = BallTree( self.matrixD, leaf_size=40, metric=lambda i, j: 1 - self.calcSimilarity(i, j) # 自定义相似度 ) # 使用所有数据计算残差 self.healthyResidual = self.calcResidualByLocallyWeightedLR(trainDataset) return 0 def calcResidualByLocallyWeightedLR(self, newStates): """优化残差计算""" if len(newStates.shape) == 1: newStates = newStates.reshape(-1, 1) dist, iList = self.normalDataBallTree.query( newStates, k=min(10, len(self.matrixD)), return_distance=True ) weights = 1 / (dist + 1e-5) weights /= weights.sum(axis=1)[:, np.newaxis] est_X = np.sum(weights[:, :, np.newaxis] * self.matrixD[iList[0]], axis=1) return est_X - newStates def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1): """优化SPRT计算""" stateResidual = self.calcResidualByLocallyWeightedLR(newsStates) weightedStateResidual = np.dot(stateResidual, feature_weight) weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight) mu0 = np.mean(weightedHealthyResidual) sigma0 = np.std(weightedHealthyResidual) # 向量化计算 n = len(newsStates) if n < decisionGroup: return [50] # 中性值 rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup) / decisionGroup, 'valid') si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2 * mu0) / (2 * sigma0 ** 2) lowThres = np.log(beta / (1 - alpha)) highThres = np.log((1 - beta) / alpha) si = np.clip(si, lowThres, highThres) si = np.where(si > 0, si / highThres, si / lowThres) flag = 100 - si * 100 # 填充不足的部分 if len(flag) < n: flag = np.pad(flag, (0, n - len(flag)), mode='edge') return flag.tolist() def CRITIC_prepare(self, data, flag=1): """标准化处理""" data = data.astype(float) numeric_cols = data.select_dtypes(include=[np.number]).columns negative_cols = [col for col in numeric_cols if any(kw in col for kw in ['temperature'])] positive_cols = list(set(numeric_cols) - set(negative_cols)) # 负向标准化 if negative_cols: max_val = data[negative_cols].max() min_val = data[negative_cols].min() data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5) # 正向标准化 if positive_cols: max_val = data[positive_cols].max() min_val = data[positive_cols].min() data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5) return data def CRITIC(self, data): """CRITIC权重计算(支持单特征)""" try: # 处理单特征情况 if len(data.columns) == 1: return pd.Series([1.0], index=data.columns) data_norm = self.CRITIC_prepare(data.copy()) std = data_norm.std(ddof=0).clip(lower=0.01) # 计算相关系数矩阵(添加异常处理) try: corr = np.abs(np.corrcoef(data_norm.T)) np.fill_diagonal(corr, 0) conflict = np.sum(1 - corr, axis=1) except: # 如果计算相关系数失败,使用等权重 return pd.Series(np.ones(len(data.columns))/len(data.columns)) info = std * conflict weights = info / info.sum() return pd.Series(weights, index=data.columns) except Exception as e: print(f"CRITIC计算失败: {str(e)}") return pd.Series(np.ones(len(data.columns))/len(data.columns)) def ahp(self, matrix): """AHP权重计算""" eigenvalue, eigenvector = np.linalg.eig(matrix) max_idx = np.argmax(eigenvalue) weight = eigenvector[:, max_idx].real return weight / weight.sum(), eigenvalue[max_idx].real return MSETCore() def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name): # 生成缓存键 cache_key = f"assessment:{engine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}" try: # 尝试从缓存获取 cached_result = self.cache_client.get(cache_key) if cached_result: return json.loads(cached_result) """评估单个风机""" results = { "engine_code": engine_code, "wind_turbine_name": wind_turbine_name, "mill_type": mill_type, "total_health_score": None, "subsystems": {}, "assessed_subsystems": [] } # 各子系统评估 subsystems_to_assess = [ ('generator', self.subsystem_config['generator'][mill_type], 1), ('nacelle', self.subsystem_config['nacelle'], 1), ('grid', self.subsystem_config['grid'], 1), ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None, 1) ] for subsystem, config, min_features in subsystems_to_assess: if config is None: continue features = self._get_subsystem_features(config, data) # 功能1:无论特征数量是否足够都输出结果 if len(features) >= min_features: assessment = self._assess_subsystem(data[features]) else: assessment = { 'health_score': -1, # 特征不足时输出'-' 'weights': {}, 'message': f'Insufficient features (required {min_features}, got {len(features)})' } print('结果打印',assessment) # 功能3:删除features内容 if 'features' in assessment: del assessment['features'] # 最终清理:确保没有NaN值 for sys, result in results["subsystems"].items(): if isinstance(result['health_score'], float) and np.isnan(result['health_score']): result['health_score'] = -1 result['message'] = (result.get('message') or '') + '; NaN detected' if isinstance(results["total_health_score"], float) and np.isnan(results["total_health_score"]): results["total_health_score"] = -1 results["subsystems"][subsystem] = assessment # 计算整机健康度(使用新字段名) if results["subsystems"]: # 只计算健康值为数字的子系统 valid_subsystems = [ k for k, v in results["subsystems"].items() if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0 ] if valid_subsystems: weights = self._get_subsystem_weights(valid_subsystems) health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems] results["total_health_score"] = float(np.dot(health_scores, weights)) results["assessed_subsystems"] = valid_subsystems return results except Exception as e: print('评估出错') def _get_all_possible_features(self, mill_type, available_columns): """ 获取所有可能的特征列(基于实际存在的列) 参数: mill_type: 机型类型 available_columns: 数据库实际存在的列名列表 """ features = [] available_columns_lower = [col.lower() for col in available_columns] # 不区分大小写匹配 # for subsys_name, subsys_config in assessor.subsystem_config.items(): for subsys_name, subsys_config in self.subsystem_config.items(): # 处理子系统配置 if subsys_name == 'generator': config = subsys_config.get(mill_type, {}) elif subsys_name == 'drive_train' and mill_type != 'dfig': continue else: config = subsys_config # 处理固定特征 if 'fixed' in config: for f in config['fixed']: if f in available_columns: features.append(f) # 处理关键词特征 if 'keywords' in config: for rule in config['keywords']: matched = [] include_kws = [kw.lower() for kw in rule['include']] exclude_kws = [ex.lower() for ex in rule.get('exclude', [])] for col in available_columns: col_lower = col.lower() # 检查包含关键词 include_ok = all(kw in col_lower for kw in include_kws) # 检查排除关键词 exclude_ok = not any(ex in col_lower for ex in exclude_kws) if include_ok and exclude_ok: matched.append(col) if len(matched) >= rule.get('min_count', 1): features.extend(matched) return list(set(features)) # 去重 def _get_subsystem_features(self, config: Dict, data: pd.DataFrame) -> List[str]: """最终版特征获取方法""" available_features = [] # 固定特征检查(要求至少10%非空) if 'fixed' in config: for f in config['fixed']: if f in data.columns and data[f].notna().mean() > 0.1: available_features.append(f) print(f"匹配到的固定特征: {available_features}") # 关键词特征检查 if 'keywords' in config: for rule in config['keywords']: matched = [ col for col in data.columns if all(kw.lower() in col.lower() for kw in rule['include']) and not any(ex.lower() in col.lower() for ex in rule.get('exclude', [])) and data[col].notna().mean() > 0.1 # 数据有效性检查 ] if len(matched) >= rule.get('min_count', 1): available_features.extend(matched) print(f"匹配到的关键词特征: {available_features}") return list(set(available_features)) def _assess_subsystem(self, data: pd.DataFrame) -> Dict: """评估子系统(支持单特征)""" # 数据清洗 clean_data = data.dropna() if len(clean_data) < 10: # 降低最小样本量要求(原为20) return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'} try: # 标准化 normalized_data = self.mset.CRITIC_prepare(clean_data) # 计算权重 - 处理单特征情况 if len(normalized_data.columns) == 1: weights = pd.Series([1.0], index=normalized_data.columns) else: weights = self.mset.CRITIC(normalized_data) # MSET评估 health_score = self._run_mset_assessment(normalized_data.values, weights.values) bins = [0, 10, 20, 30, 40, 50, 60, 70, 80] adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7] def adjust_score(score): for i in range(len(bins)): if score < bins[i]: return score + adjust_values[i-1] return score # adjusted_score = adjust_score(health_score) # if adjusted_score >= 100: adjusted_score = 92.8 return { 'health_score': float(health_score), 'weights': weights.to_dict(), 'features': list(data.columns) } except Exception as e: return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)} @lru_cache(maxsize=10) def _get_mset_model(self, train_data: tuple): """缓存MSET模型""" # 注意:由于lru_cache需要可哈希参数,这里使用元组 arr = np.array(train_data) model = self._create_mset_core() model.genDLMatrix(arr) return model def _run_mset_assessment(self, data: np.ndarray, weights: np.ndarray) -> float: """执行MSET评估""" # 检查权重有效性 if np.isnan(weights).any() or np.isinf(weights).any(): weights = np.ones_like(weights) / len(weights) # 重置为等权重 # 分割训练集和测试集 split_idx = len(data) // 2 train_data = data[:split_idx] test_data = data[split_idx:] # 使用缓存模型 try: model = self._get_mset_model(tuple(map(tuple, train_data))) flags = model.calcSPRT(test_data, weights) # 过滤NaN值并计算均值 valid_flags = [x for x in flags if not np.isnan(x)] if not valid_flags: return 50.0 # 默认中性值 return float(np.mean(valid_flags)) except Exception as e: print(f"MSET评估失败: {str(e)}") return 50.0 # 默认中性值 def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray: """生成等权重的子系统权重向量""" n = len(subsystems) if n == 0: return np.array([]) # 直接返回等权重向量 return np.ones(n) / n