# Temp_Diag.py import numpy as np import pandas as pd from sklearn.neighbors import BallTree from sqlalchemy import create_engine, text import math, joblib, os class MSET_Temp: """ MSET + SPRT 温度趋势/阈值分析类。 - 离线:用全场数据训练 genDLMatrix → save_model - 在线:根据前端传来的 windTurbineNumberList & 时间区间,_get_data_by_filter 拿数据 → predict_SPRT """ def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str): self.windCode = windCode.strip() self.windTurbineNumberList = windTurbineNumberList or [] self.startTime = startTime self.endTime = endTime # 离线或加载后会赋值 self.matrixD = None self.matrixL = None self.healthyResidual = None self.normalDataBallTree = None # SPRT 参数(离线训练时赋值) self.feature_weight: np.ndarray | None = None self.alpha: float = 0.1 self.beta: float = 0.1 def _get_data_by_filter(self) -> pd.DataFrame: """ 在线推理用:按前端给的风机列表 & 时间范围拉数据, 如果风机列表为空,则只按时间拉全场数据。 """ table = f"{self.windCode}_minute" engine = create_engine( "mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod" ) if self.windTurbineNumberList: turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList) where = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end" else: where = "time_stamp BETWEEN :start AND :end" sql = text(f""" SELECT * FROM {table} WHERE {where} ORDER BY time_stamp ASC """) df = pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime}) return df def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float: if len(x) != len(y): return 0.0 if m == 'cbd': return float(np.mean([1.0/(1.0+abs(p-q)) for p,q in zip(x,y)])) diffsq = np.sum((x-y)**2) return float(1.0/(1.0+math.sqrt(diffsq))) def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int: """ 离线训练用:构造 matrixD、matrixL、健康残差、BallTree """ m, n = trainDataset.shape if m < dataSize4D + dataSize4L: return -1 # Step1: 每维最小/最大入 D D_idx = [] D = [] for i in range(n): col = trainDataset[:, i] imin, imax = np.argmin(col), np.argmax(col) for idx in (imin, imax): D.append(trainDataset[idx].tolist()) D_idx.append(idx) # Step2: 迭代挑偏样本到 dataSize4D while len(D_idx) < dataSize4D: free = list(set(range(m)) - set(D_idx)) scores = [] for idx in free: dists = [1-self.calcSimilarity(trainDataset[idx], d) for d in D] scores.append((np.mean(dists), idx)) _, pick = max(scores) D.append(trainDataset[pick].tolist()); D_idx.append(pick) self.matrixD = np.array(D) # BallTree + matrixL + healthyResidual self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - self.calcSimilarity(a, b) ) self.matrixL = trainDataset.copy() self.healthyResidual = self._calcResidual(self.matrixL) return 0 def _calcResidual(self, states: np.ndarray) -> np.ndarray: ests = [] for x in states: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0/(dist[0]+1e-1) w = w/ w.sum() ests.append(np.sum([wi*self.matrixD[j] for wi,j in zip(w, idxs[0])],axis=0)) est = np.array(ests).reshape(len(ests), -1) return est - states def calcSPRT(self, newsStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> list[float]: # 1) 残差+加权 resN = self._calcResidual(newsStates) wN = [np.dot(r, feature_weight) for r in resN] wH = [np.dot(r, feature_weight) for r in self.healthyResidual] mu0, sigma0 = np.mean(wH), np.std(wH) low = math.log(beta/(1-alpha)) high = math.log((1-beta)/alpha) flags = [] for i in range(len(wN)-decisionGroup+1): seg = wN[i:i+decisionGroup]; mu1 = np.mean(seg) si = (sum(seg)*(mu1-mu0)/sigma0**2 - decisionGroup*((mu1**2-mu0**2)/(2*sigma0**2))) si = max(min(si, high), low) flags.append(si/high if si>0 else si/low) return flags def predict_SPRT(self, newsStates: np.ndarray, decisionGroup: int = 5) -> list[float]: """在线推理:用已加载的 matrixD、healthyResidual、feature_weight、alpha、beta""" return self.calcSPRT( newsStates, self.feature_weight, alpha=self.alpha, beta=self.beta, decisionGroup=decisionGroup ) def save_model(self, path: str): """离线训练后持久化:matrixD, healthyResidual, feature_weight, alpha, beta""" os.makedirs(os.path.dirname(path), exist_ok=True) joblib.dump({ 'matrixD': self.matrixD, 'healthyResidual': self.healthyResidual, 'feature_weight': self.feature_weight, 'alpha': self.alpha, 'beta': self.beta, }, path) @classmethod def load_model(cls, path: str) -> 'MSET_Temp': """在线启动时反序列化并重建 BallTree""" data = joblib.load(path) inst = cls('', [], '', '') inst.matrixD = data['matrixD'] inst.healthyResidual = data['healthyResidual'] inst.feature_weight = data['feature_weight'] inst.alpha = data['alpha'] inst.beta = data['beta'] inst.normalDataBallTree = BallTree( inst.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b) ) return inst