import numpy as np import pandas as pd from sklearn.neighbors import BallTree from pathlib import Path from typing import Tuple, Dict, List class MSETService: def __init__(self): self.matrixD = None self.matrixL = None self.healthyResidual = None self.normalDataBallTree = None self.DDSimilarity = None self.project_root = Path(__file__).resolve().parent.parent.parent @staticmethod def generate_test_data(num_samples: int = 1000) -> Dict[str, pd.DataFrame]: """生成测试数据""" np.random.seed(42) gen_data = np.random.normal(80, 5, (num_samples, 4)) nacelle_data = np.random.normal(0.5, 0.1, (num_samples, 4)) converter_data = np.column_stack([ np.random.normal(40, 5, num_samples), np.random.normal(2000, 300, num_samples) ]) grid_data = np.column_stack([ np.random.normal(500, 100, num_samples), np.random.normal(2000, 300, num_samples), np.random.normal(1000, 200, (num_samples, 3)) ]) return { 'generator': pd.DataFrame(gen_data, columns=['U_temp', 'V_temp', 'W_temp', 'bearing_temp']), 'nacelle': pd.DataFrame(nacelle_data, columns=['vibration_front', 'vibration_side', 'position', 'temperature']), 'converter': pd.DataFrame(converter_data, columns=['coolant_temp', 'active_power']), 'grid': pd.DataFrame(grid_data, columns=['reactive_power', 'active_power', 'current_A', 'current_B', 'current_C']) } def calc_similarity(self, x: np.ndarray, y: np.ndarray, method: str = 'euc') -> float: if len(x) != len(y): return 0.0 if method == 'cbd': return np.mean([1 / (1 + np.abs(p - q)) for p, q in zip(x, y)]) else: return 1 / (1 + np.sqrt(np.sum((p - q)**2 for p, q in zip(x, y)))) def train_model(self, train_data: np.ndarray, dataSize4D: int = 100, dataSize4L: int = 50) -> int: m, n = train_data.shape if m < dataSize4D + dataSize4L: print('训练数据集太小,无法生成矩阵D和L') return -1 self.matrixD = [] selectIndex4D = [] for i in range(n): feature_i = train_data[:, i] minIndex = np.argmin(feature_i) maxIndex = np.argmax(feature_i) self.matrixD.extend([train_data[minIndex], train_data[maxIndex]]) selectIndex4D.extend([minIndex, maxIndex]) while len(selectIndex4D) < dataSize4D: freeStateList = list(set(range(len(train_data))) - set(selectIndex4D)) if not freeStateList: break distList = [np.mean([1 - self.calc_similarity(train_data[i], x) for x in self.matrixD]) for i in freeStateList] selectId = freeStateList[np.argmax(distList)] self.matrixD.append(train_data[selectId]) selectIndex4D.append(selectId) self.matrixD = np.array(self.matrixD[:dataSize4D]) self.matrixL = train_data self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda i,j: 1 - self.calc_similarity(i,j) ) lamdaRatio = 1e-3 m_d = len(self.matrixD) self.DDSimilarity = np.array([ [1 - self.calc_similarity(x, y) for x in self.matrixD] for y in self.matrixD ]) + lamdaRatio * np.eye(m_d) self.DDSimilarity = np.linalg.inv(self.DDSimilarity) self.healthyResidual = self._calc_residuals(self.matrixL) return 0 def _calc_residuals(self, states: np.ndarray) -> np.ndarray: m, n = states.shape est_X = [] for x in states: dist, iList = self.normalDataBallTree.query([x], 20, return_distance=True) weight = 1 / (dist[0] + 1e-1) weight = weight / np.sum(weight) eState = np.sum([w * self.matrixD[i] for w, i in zip(weight, iList[0])], axis=0) est_X.append(eState) return np.array(est_X) - states def _critic_prepare(self, data: pd.DataFrame, flag: int = 1) -> pd.DataFrame: data_columns = data.columns.values maxnum = np.max(data, axis=0) minnum = np.min(data, axis=0) Y = (data - minnum) / (maxnum - minnum) if flag == 0 else (maxnum - data) / (maxnum - minnum) Y0 = Y.values # 转换为NumPy数组 Y0[np.where(Y0 == 0)] = 0.00001 return pd.DataFrame(Y0, columns=data_columns) def _critic(self, data: pd.DataFrame) -> np.ndarray: """确保返回NumPy数组""" n, m = data.shape s = np.std(data, axis=0) r = np.corrcoef(data, rowvar=False) a = np.sum(1 - r, axis=1) c = s * a weights = c / np.sum(c) return weights # 直接返回NumPy数组 def evaluate_subsystem_health(self, data: pd.DataFrame) -> Tuple[np.ndarray, float]: data = data.apply(pd.to_numeric, errors='coerce').dropna() if data.empty: return np.array([]), 0.0 W_prepare_data = self._critic_prepare(data) weights = self._critic(W_prepare_data) # 获取权重 # 关键修改:显式转换为NumPy数组 weights = np.array(weights) data_values = data.values m, n = data_values.shape flag_Spart_data = [] for i in range(n): data_i = data_values[:, i].reshape(-1, 1) train_data = data_i[:len(data_i)//2] test_data = data_i[len(data_i)//2+1:] if len(train_data) < 10 or len(test_data) < 5: continue self.train_model(train_data, 60, 5) feature_weight = np.array([1.0]) flag_data = self._sprt(test_data, feature_weight, decisionGroup=5) flag_Spart_data.append(flag_data) if not flag_Spart_data: return weights, 0.0 flag_Spart_data = np.array(flag_Spart_data).T weights = weights.reshape(-1, 1) # 现在可以安全调用reshape score = np.dot(flag_Spart_data, weights).mean() return weights.flatten(), float(score) def _sprt(self, newStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> List[float]: feature_weight = feature_weight.flatten() stateResidual = self._calc_residuals(newStates) if stateResidual.shape[1] != len(feature_weight): feature_weight = np.array([1.0]) weightedStateResidual = [np.dot(x, feature_weight) for x in stateResidual] weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual] mu0 = np.mean(weightedHealthyResidual) sigma0 = np.std(weightedHealthyResidual) if sigma0 < 1e-10: sigma0 = 1e-10 lowThres = np.log(beta / (1 - alpha)) highThres = np.log((1 - beta) / alpha) flag = [] for i in range(len(newStates) - decisionGroup + 1): mu1 = np.mean(weightedStateResidual[i:i+decisionGroup]) si = np.sum(weightedStateResidual[i:i+decisionGroup]) * (mu1 - mu0) / sigma0**2 - \ decisionGroup * (mu1**2 - mu0**2) / (2 * sigma0**2) si = np.clip(si, lowThres, highThres) si = 100 - (si / highThres if si > 0 else si / lowThres) * 100 flag.append(si) return flag def evaluate_turbine_health(self, subsystems_data: Dict[str, pd.DataFrame]) -> Tuple[float, Dict[str, float]]: subsystems = { 'generator': [10, 11, 12, 17], 'nacelle': [23, 24, 41, 42], 'converter': [18, 19], 'grid': [32, 38, 64, 65, 66] } matrix_subsys = np.array([ [1, 2, 3, 4], [1/2, 1, 2, 3], [1/3, 1/2, 1, 2], [1/4, 1/3, 1/2, 1], ]) health_scores = {} for name, cols in subsystems.items(): if name in subsystems_data: data = subsystems_data[name] w, score = self.evaluate_subsystem_health(data) health_scores[name] = score subsystem_names = list(health_scores.keys()) if not subsystem_names: return 0.0, {} m = len(subsystem_names) ahp_matrix = matrix_subsys[:m, :m] eigenvalue, eigenvector = np.linalg.eig(ahp_matrix) max_idx = np.argmax(eigenvalue) subsystem_weights = eigenvector[:, max_idx].real subsystem_weights = subsystem_weights / np.sum(subsystem_weights) overall_score = np.sum([ health_scores[name] * subsystem_weights[i] for i, name in enumerate(subsystem_names) ]) return float(overall_score), health_scores