import math import os import joblib import numpy as np import pandas as pd from sqlalchemy import text from sklearn.neighbors import BallTree from app.config import dataBase from app.database import get_engine class MSET_Temp: """ MSET + SPRT 温度分析类: - 离线训练:genDLMatrix → save_model - 在线推理:load_model → predict_SPRT """ def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str): self.windCode = windCode.strip() self.windTurbineNumberList = windTurbineNumberList or [] self.startTime = startTime self.endTime = endTime # 离线训练/加载后赋值 self.matrixD = None self.healthyResidual = None self.normalDataBallTree = None # SPRT 参数(离线训练时设置) self.feature_weight: np.ndarray | None = None self.alpha: float = 0.1 self.beta: float = 0.1 def _get_data_by_filter(self) -> pd.DataFrame: """ 在线推理专用:根据 self.windTurbineNumberList & 时间拉数据; 如果列表为空,则拉全场数据。 """ # 特殊风场表名映射 special_wind_farms = { "WOF093400005": f"`{self.windCode}-WOB000001_minute`" # 加上反引号 } # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号 table = special_wind_farms.get(self.windCode, f"{self.windCode}_minute") engine = get_engine(dataBase.DATA_DB) if self.windTurbineNumberList: turbines = ",".join(f"'{t}'" for t in self.windTurbineNumberList) cond = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end" else: cond = "time_stamp BETWEEN :start AND :end" sql = text(f""" SELECT * FROM {table} WHERE {cond} ORDER BY time_stamp ASC """) return pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime}) def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float: if len(x) != len(y): return 0.0 if m == 'cbd': return float(np.mean([1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)])) diffsq = np.sum((x - y) ** 2) return float(1.0 / (1.0 + math.sqrt(diffsq))) def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int: """ 离线训练:构造 matrixD/matrixL/healthyResidual/BallTree """ m, n = trainDataset.shape if m < dataSize4D + dataSize4L: return -1 # Step1:每维最小/最大入 D D_idx, D = [], [] for i in range(n): col = trainDataset[:, i] for idx in (np.argmin(col), np.argmax(col)): D.append(trainDataset[idx].tolist()) D_idx.append(idx) # Step2:挑样本至 dataSize4D while len(D_idx) < dataSize4D: free = list(set(range(m)) - set(D_idx)) scores = [(np.mean([1 - self.calcSimilarity(trainDataset[i], d) for d in D]), i) for i in free] _, pick = max(scores) D.append(trainDataset[pick].tolist()) D_idx.append(pick) self.matrixD = np.array(D) # BallTree + healthyResidual self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - self.calcSimilarity(a, b) ) # healthyResidual ests = [] for x in trainDataset: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0 / (dist[0] + 1e-1) w /= w.sum() ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0)) self.healthyResidual = np.array(ests) - trainDataset return 0 def calcSPRT(self, newsStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> list[float]: """ Wald-SPRT 得分 """ # 新状态残差 ests = [] for x in newsStates: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0 / (dist[0] + 1e-1); w /= w.sum() ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0)) resN = np.array(ests) - newsStates # 加权 wN = [np.dot(r, feature_weight) for r in resN] wH = [np.dot(r, feature_weight) for r in self.healthyResidual] mu0, sigma0 = np.mean(wH), np.std(wH) low = math.log(beta / (1 - alpha)); high = math.log((1 - beta) / alpha) flags = [] for i in range(len(wN) - decisionGroup + 1): seg = wN[i:i + decisionGroup]; mu1 = np.mean(seg) si = (sum(seg) * (mu1 - mu0) / sigma0 ** 2 - decisionGroup * ((mu1 ** 2 - mu0 ** 2) / (2 * sigma0 ** 2))) si = max(min(si, high), low) flags.append(si / high if si > 0 else si / low) return flags def predict_SPRT(self, newsStates: np.ndarray, decisionGroup: int = 5) -> list[float]: """ 在线推理:用离线保存的 matrixD/healthyResidual/feature_weight/alpha/beta """ return self.calcSPRT( newsStates, self.feature_weight, alpha=self.alpha, beta=self.beta, decisionGroup=decisionGroup ) def save_model(self, path: str): """ Save matrixD, healthyResidual, feature_weight, alpha, beta """ os.makedirs(os.path.dirname(path), exist_ok=True) joblib.dump({ 'matrixD': self.matrixD, 'healthyResidual': self.healthyResidual, 'feature_weight': self.feature_weight, 'alpha': self.alpha, 'beta': self.beta, }, path) @classmethod def load_model(cls, path: str) -> 'MSET_Temp': """ Load + rebuild BallTree """ data = joblib.load(path) inst = cls('', [], '', '') inst.matrixD = data['matrixD'] inst.healthyResidual = data['healthyResidual'] inst.feature_weight = data['feature_weight'] inst.alpha = data['alpha'] inst.beta = data['beta'] inst.normalDataBallTree = BallTree( inst.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b) ) return inst