import numpy as np import pandas as pd from sklearn.neighbors import BallTree from sqlalchemy import create_engine, text import math, joblib, os class MSET_Temp: """ MSET + SPRT 温度分析类: - 离线训练:genDLMatrix → save_model - 在线推理:load_model → predict_SPRT """ def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str): self.windCode = windCode.strip() self.windTurbineNumberList = windTurbineNumberList or [] self.startTime = startTime self.endTime = endTime # 离线训练/加载后赋值 self.matrixD = None self.healthyResidual = None self.normalDataBallTree = None # SPRT 参数(离线训练时设置) self.feature_weight: np.ndarray | None = None self.alpha: float = 0.1 self.beta: float = 0.1 def _get_data_by_filter(self) -> pd.DataFrame: """ 在线推理专用:根据 self.windTurbineNumberList & 时间拉数据; 如果列表为空,则拉全场数据。 """ table = f"{self.windCode}_minute" engine = create_engine( #"mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod" "mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod" ) if self.windTurbineNumberList: turbines = ",".join(f"'{t}'" for t in self.windTurbineNumberList) cond = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end" else: cond = "time_stamp BETWEEN :start AND :end" sql = text(f""" SELECT * FROM {table} WHERE {cond} ORDER BY time_stamp ASC """) return pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime}) def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float: if len(x) != len(y): return 0.0 if m == 'cbd': return float(np.mean([1.0/(1.0+abs(p-q)) for p,q in zip(x,y)])) diffsq = np.sum((x-y)**2) return float(1.0/(1.0+math.sqrt(diffsq))) def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int: """ 离线训练:构造 matrixD/matrixL/healthyResidual/BallTree """ m, n = trainDataset.shape if m < dataSize4D + dataSize4L: return -1 # Step1:每维最小/最大入 D D_idx, D = [], [] for i in range(n): col = trainDataset[:, i] for idx in (np.argmin(col), np.argmax(col)): D.append(trainDataset[idx].tolist()) D_idx.append(idx) # Step2:挑样本至 dataSize4D while len(D_idx) < dataSize4D: free = list(set(range(m)) - set(D_idx)) scores = [(np.mean([1-self.calcSimilarity(trainDataset[i], d) for d in D]), i) for i in free] _, pick = max(scores) D.append(trainDataset[pick].tolist()) D_idx.append(pick) self.matrixD = np.array(D) # BallTree + healthyResidual self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda a,b: 1.0 - self.calcSimilarity(a, b) ) # healthyResidual ests = [] for x in trainDataset: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0/(dist[0]+1e-1) w /= w.sum() ests.append(np.sum([wi*self.matrixD[j] for wi,j in zip(w,idxs[0])], axis=0)) self.healthyResidual = np.array(ests) - trainDataset return 0 def calcSPRT(self, newsStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> list[float]: """ Wald-SPRT 得分 """ # 新状态残差 ests = [] for x in newsStates: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0/(dist[0]+1e-1); w/=w.sum() ests.append(np.sum([wi*self.matrixD[j] for wi,j in zip(w,idxs[0])], axis=0)) resN = np.array(ests) - newsStates # 加权 wN = [np.dot(r, feature_weight) for r in resN] wH = [np.dot(r, feature_weight) for r in self.healthyResidual] mu0, sigma0 = np.mean(wH), np.std(wH) low = math.log(beta/(1-alpha)); high = math.log((1-beta)/alpha) flags = [] for i in range(len(wN)-decisionGroup+1): seg = wN[i:i+decisionGroup]; mu1=np.mean(seg) si = (sum(seg)*(mu1-mu0)/sigma0**2 - decisionGroup*((mu1**2-mu0**2)/(2*sigma0**2))) si = max(min(si, high), low) flags.append(si/high if si>0 else si/low) return flags def predict_SPRT(self, newsStates: np.ndarray, decisionGroup: int = 5) -> list[float]: """ 在线推理:用离线保存的 matrixD/healthyResidual/feature_weight/alpha/beta """ return self.calcSPRT( newsStates, self.feature_weight, alpha=self.alpha, beta=self.beta, decisionGroup=decisionGroup ) def save_model(self, path: str): """ Save matrixD, healthyResidual, feature_weight, alpha, beta """ os.makedirs(os.path.dirname(path), exist_ok=True) joblib.dump({ 'matrixD': self.matrixD, 'healthyResidual': self.healthyResidual, 'feature_weight': self.feature_weight, 'alpha': self.alpha, 'beta': self.beta, }, path) @classmethod def load_model(cls, path: str) -> 'MSET_Temp': """ Load + rebuild BallTree """ data = joblib.load(path) inst = cls('', [], '', '') inst.matrixD = data['matrixD'] inst.healthyResidual = data['healthyResidual'] inst.feature_weight = data['feature_weight'] inst.alpha = data['alpha'] inst.beta = data['beta'] inst.normalDataBallTree = BallTree( inst.matrixD, leaf_size=4, metric=lambda a,b: 1.0 - inst.calcSimilarity(a, b) ) return inst