123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import numpy as np
- import pandas as pd
- from sklearn.neighbors import BallTree
- from pathlib import Path
- from typing import Tuple, Dict, List
- class MSETService:
- def __init__(self):
- self.matrixD = None
- self.matrixL = None
- self.healthyResidual = None
- self.normalDataBallTree = None
- self.DDSimilarity = None
- self.project_root = Path(__file__).resolve().parent.parent.parent
- @staticmethod
- def generate_test_data(num_samples: int = 1000) -> Dict[str, pd.DataFrame]:
- """生成测试数据"""
- np.random.seed(42)
- gen_data = np.random.normal(80, 5, (num_samples, 4))
- nacelle_data = np.random.normal(0.5, 0.1, (num_samples, 4))
- converter_data = np.column_stack([
- np.random.normal(40, 5, num_samples),
- np.random.normal(2000, 300, num_samples)
- ])
- grid_data = np.column_stack([
- np.random.normal(500, 100, num_samples),
- np.random.normal(2000, 300, num_samples),
- np.random.normal(1000, 200, (num_samples, 3))
- ])
- return {
- 'generator': pd.DataFrame(gen_data, columns=['U_temp', 'V_temp', 'W_temp', 'bearing_temp']),
- 'nacelle': pd.DataFrame(nacelle_data, columns=['vibration_front', 'vibration_side', 'position', 'temperature']),
- 'converter': pd.DataFrame(converter_data, columns=['coolant_temp', 'active_power']),
- 'grid': pd.DataFrame(grid_data, columns=['reactive_power', 'active_power', 'current_A', 'current_B', 'current_C'])
- }
- def calc_similarity(self, x: np.ndarray, y: np.ndarray, method: str = 'euc') -> float:
- if len(x) != len(y):
- return 0.0
- if method == 'cbd':
- return np.mean([1 / (1 + np.abs(p - q)) for p, q in zip(x, y)])
- else:
- return 1 / (1 + np.sqrt(np.sum((p - q)**2 for p, q in zip(x, y))))
- def train_model(self, train_data: np.ndarray, dataSize4D: int = 100, dataSize4L: int = 50) -> int:
- m, n = train_data.shape
- if m < dataSize4D + dataSize4L:
- print('训练数据集太小,无法生成矩阵D和L')
- return -1
- self.matrixD = []
- selectIndex4D = []
- for i in range(n):
- feature_i = train_data[:, i]
- minIndex = np.argmin(feature_i)
- maxIndex = np.argmax(feature_i)
- self.matrixD.extend([train_data[minIndex], train_data[maxIndex]])
- selectIndex4D.extend([minIndex, maxIndex])
- while len(selectIndex4D) < dataSize4D:
- freeStateList = list(set(range(len(train_data))) - set(selectIndex4D))
- if not freeStateList: break
- distList = [np.mean([1 - self.calc_similarity(train_data[i], x) for x in self.matrixD])
- for i in freeStateList]
- selectId = freeStateList[np.argmax(distList)]
- self.matrixD.append(train_data[selectId])
- selectIndex4D.append(selectId)
- self.matrixD = np.array(self.matrixD[:dataSize4D])
- self.matrixL = train_data
- self.normalDataBallTree = BallTree(
- self.matrixD, leaf_size=4,
- metric=lambda i,j: 1 - self.calc_similarity(i,j)
- )
- lamdaRatio = 1e-3
- m_d = len(self.matrixD)
- self.DDSimilarity = np.array([
- [1 - self.calc_similarity(x, y) for x in self.matrixD] for y in self.matrixD
- ]) + lamdaRatio * np.eye(m_d)
- self.DDSimilarity = np.linalg.inv(self.DDSimilarity)
- self.healthyResidual = self._calc_residuals(self.matrixL)
- return 0
- def _calc_residuals(self, states: np.ndarray) -> np.ndarray:
- m, n = states.shape
- est_X = []
- for x in states:
- dist, iList = self.normalDataBallTree.query([x], 20, return_distance=True)
- weight = 1 / (dist[0] + 1e-1)
- weight = weight / np.sum(weight)
- eState = np.sum([w * self.matrixD[i] for w, i in zip(weight, iList[0])], axis=0)
- est_X.append(eState)
- return np.array(est_X) - states
- def _critic_prepare(self, data: pd.DataFrame, flag: int = 1) -> pd.DataFrame:
- data_columns = data.columns.values
- maxnum = np.max(data, axis=0)
- minnum = np.min(data, axis=0)
- Y = (data - minnum) / (maxnum - minnum) if flag == 0 else (maxnum - data) / (maxnum - minnum)
- Y0 = Y.values # 转换为NumPy数组
- Y0[np.where(Y0 == 0)] = 0.00001
- return pd.DataFrame(Y0, columns=data_columns)
- def _critic(self, data: pd.DataFrame) -> np.ndarray:
- """确保返回NumPy数组"""
- n, m = data.shape
- s = np.std(data, axis=0)
- r = np.corrcoef(data, rowvar=False)
- a = np.sum(1 - r, axis=1)
- c = s * a
- weights = c / np.sum(c)
- return weights # 直接返回NumPy数组
- def evaluate_subsystem_health(self, data: pd.DataFrame) -> Tuple[np.ndarray, float]:
- data = data.apply(pd.to_numeric, errors='coerce').dropna()
- if data.empty: return np.array([]), 0.0
-
- W_prepare_data = self._critic_prepare(data)
- weights = self._critic(W_prepare_data) # 获取权重
-
- # 关键修改:显式转换为NumPy数组
- weights = np.array(weights)
-
- data_values = data.values
- m, n = data_values.shape
- flag_Spart_data = []
-
- for i in range(n):
- data_i = data_values[:, i].reshape(-1, 1)
- train_data = data_i[:len(data_i)//2]
- test_data = data_i[len(data_i)//2+1:]
-
- if len(train_data) < 10 or len(test_data) < 5:
- continue
-
- self.train_model(train_data, 60, 5)
- feature_weight = np.array([1.0])
- flag_data = self._sprt(test_data, feature_weight, decisionGroup=5)
- flag_Spart_data.append(flag_data)
-
- if not flag_Spart_data: return weights, 0.0
-
- flag_Spart_data = np.array(flag_Spart_data).T
- weights = weights.reshape(-1, 1) # 现在可以安全调用reshape
- score = np.dot(flag_Spart_data, weights).mean()
- return weights.flatten(), float(score)
-
- def _sprt(self, newStates: np.ndarray, feature_weight: np.ndarray,
- alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> List[float]:
- feature_weight = feature_weight.flatten()
- stateResidual = self._calc_residuals(newStates)
-
- if stateResidual.shape[1] != len(feature_weight):
- feature_weight = np.array([1.0])
-
- weightedStateResidual = [np.dot(x, feature_weight) for x in stateResidual]
- weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual]
-
- mu0 = np.mean(weightedHealthyResidual)
- sigma0 = np.std(weightedHealthyResidual)
- if sigma0 < 1e-10: sigma0 = 1e-10
-
- lowThres = np.log(beta / (1 - alpha))
- highThres = np.log((1 - beta) / alpha)
- flag = []
-
- for i in range(len(newStates) - decisionGroup + 1):
- mu1 = np.mean(weightedStateResidual[i:i+decisionGroup])
- si = np.sum(weightedStateResidual[i:i+decisionGroup]) * (mu1 - mu0) / sigma0**2 - \
- decisionGroup * (mu1**2 - mu0**2) / (2 * sigma0**2)
-
- si = np.clip(si, lowThres, highThres)
- si = 100 - (si / highThres if si > 0 else si / lowThres) * 100
- flag.append(si)
-
- return flag
- def evaluate_turbine_health(self, subsystems_data: Dict[str, pd.DataFrame]) -> Tuple[float, Dict[str, float]]:
- subsystems = {
- 'generator': [10, 11, 12, 17],
- 'nacelle': [23, 24, 41, 42],
- 'converter': [18, 19],
- 'grid': [32, 38, 64, 65, 66]
- }
- matrix_subsys = np.array([
- [1, 2, 3, 4],
- [1/2, 1, 2, 3],
- [1/3, 1/2, 1, 2],
- [1/4, 1/3, 1/2, 1],
- ])
-
- health_scores = {}
- for name, cols in subsystems.items():
- if name in subsystems_data:
- data = subsystems_data[name]
- w, score = self.evaluate_subsystem_health(data)
- health_scores[name] = score
-
- subsystem_names = list(health_scores.keys())
- if not subsystem_names: return 0.0, {}
-
- m = len(subsystem_names)
- ahp_matrix = matrix_subsys[:m, :m]
- eigenvalue, eigenvector = np.linalg.eig(ahp_matrix)
- max_idx = np.argmax(eigenvalue)
- subsystem_weights = eigenvector[:, max_idx].real
- subsystem_weights = subsystem_weights / np.sum(subsystem_weights)
-
- overall_score = np.sum([
- health_scores[name] * subsystem_weights[i]
- for i, name in enumerate(subsystem_names)
- ])
-
- return float(overall_score), health_scores
|