123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # Temp_Diag.py
- import numpy as np
- import pandas as pd
- from sklearn.neighbors import BallTree
- from sqlalchemy import create_engine, text
- import math, joblib, os
- class MSET_Temp:
- """
- MSET + SPRT 温度趋势/阈值分析类。
- - 离线:用全场数据训练 genDLMatrix → save_model
- - 在线:根据前端传来的 windTurbineNumberList & 时间区间,_get_data_by_filter 拿数据 → predict_SPRT
- """
- def __init__(self,
- windCode: str,
- windTurbineNumberList: list[str],
- startTime: str,
- endTime: str):
- self.windCode = windCode.strip()
- self.windTurbineNumberList = windTurbineNumberList or []
- self.startTime = startTime
- self.endTime = endTime
- # 离线或加载后会赋值
- self.matrixD = None
- self.matrixL = None
- self.healthyResidual = None
- self.normalDataBallTree = None
- # SPRT 参数(离线训练时赋值)
- self.feature_weight: np.ndarray | None = None
- self.alpha: float = 0.1
- self.beta: float = 0.1
- def _get_data_by_filter(self) -> pd.DataFrame:
- """
- 在线推理用:按前端给的风机列表 & 时间范围拉数据,
- 如果风机列表为空,则只按时间拉全场数据。
- """
- table = f"{self.windCode}_minute"
- engine = create_engine(
- "mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod"
- )
- if self.windTurbineNumberList:
- turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList)
- where = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end"
- else:
- where = "time_stamp BETWEEN :start AND :end"
- sql = text(f"""
- SELECT *
- FROM {table}
- WHERE {where}
- ORDER BY time_stamp ASC
- """)
- df = pd.read_sql(sql, engine,
- params={"start": self.startTime, "end": self.endTime})
- return df
- def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float:
- if len(x) != len(y):
- return 0.0
- if m == 'cbd':
- return float(np.mean([1.0/(1.0+abs(p-q)) for p,q in zip(x,y)]))
- diffsq = np.sum((x-y)**2)
- return float(1.0/(1.0+math.sqrt(diffsq)))
- def genDLMatrix(self, trainDataset: np.ndarray,
- dataSize4D=100, dataSize4L=50) -> int:
- """
- 离线训练用:构造 matrixD、matrixL、健康残差、BallTree
- """
- m, n = trainDataset.shape
- if m < dataSize4D + dataSize4L:
- return -1
- # Step1: 每维最小/最大入 D
- D_idx = []
- D = []
- for i in range(n):
- col = trainDataset[:, i]
- imin, imax = np.argmin(col), np.argmax(col)
- for idx in (imin, imax):
- D.append(trainDataset[idx].tolist())
- D_idx.append(idx)
- # Step2: 迭代挑偏样本到 dataSize4D
- while len(D_idx) < dataSize4D:
- free = list(set(range(m)) - set(D_idx))
- scores = []
- for idx in free:
- dists = [1-self.calcSimilarity(trainDataset[idx], d) for d in D]
- scores.append((np.mean(dists), idx))
- _, pick = max(scores)
- D.append(trainDataset[pick].tolist()); D_idx.append(pick)
- self.matrixD = np.array(D)
- # BallTree + matrixL + healthyResidual
- self.normalDataBallTree = BallTree(
- self.matrixD,
- leaf_size=4,
- metric=lambda a, b: 1.0 - self.calcSimilarity(a, b)
- )
- self.matrixL = trainDataset.copy()
- self.healthyResidual = self._calcResidual(self.matrixL)
- return 0
- def _calcResidual(self, states: np.ndarray) -> np.ndarray:
- ests = []
- for x in states:
- dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
- w = 1.0/(dist[0]+1e-1)
- w = w/ w.sum()
- ests.append(np.sum([wi*self.matrixD[j] for wi,j in zip(w, idxs[0])],axis=0))
- est = np.array(ests).reshape(len(ests), -1)
- return est - states
- def calcSPRT(self,
- newsStates: np.ndarray,
- feature_weight: np.ndarray,
- alpha: float = 0.1,
- beta: float = 0.1,
- decisionGroup: int = 5) -> list[float]:
- # 1) 残差+加权
- resN = self._calcResidual(newsStates)
- wN = [np.dot(r, feature_weight) for r in resN]
- wH = [np.dot(r, feature_weight) for r in self.healthyResidual]
- mu0, sigma0 = np.mean(wH), np.std(wH)
- low = math.log(beta/(1-alpha))
- high = math.log((1-beta)/alpha)
- flags = []
- for i in range(len(wN)-decisionGroup+1):
- seg = wN[i:i+decisionGroup]; mu1 = np.mean(seg)
- si = (sum(seg)*(mu1-mu0)/sigma0**2
- - decisionGroup*((mu1**2-mu0**2)/(2*sigma0**2)))
- si = max(min(si, high), low)
- flags.append(si/high if si>0 else si/low)
- return flags
- def predict_SPRT(self,
- newsStates: np.ndarray,
- decisionGroup: int = 5) -> list[float]:
- """在线推理:用已加载的 matrixD、healthyResidual、feature_weight、alpha、beta"""
- return self.calcSPRT(
- newsStates,
- self.feature_weight,
- alpha=self.alpha,
- beta=self.beta,
- decisionGroup=decisionGroup
- )
- def save_model(self, path: str):
- """离线训练后持久化:matrixD, healthyResidual, feature_weight, alpha, beta"""
- os.makedirs(os.path.dirname(path), exist_ok=True)
- joblib.dump({
- 'matrixD': self.matrixD,
- 'healthyResidual': self.healthyResidual,
- 'feature_weight': self.feature_weight,
- 'alpha': self.alpha,
- 'beta': self.beta,
- }, path)
- @classmethod
- def load_model(cls, path: str) -> 'MSET_Temp':
- """在线启动时反序列化并重建 BallTree"""
- data = joblib.load(path)
- inst = cls('', [], '', '')
- inst.matrixD = data['matrixD']
- inst.healthyResidual = data['healthyResidual']
- inst.feature_weight = data['feature_weight']
- inst.alpha = data['alpha']
- inst.beta = data['beta']
- inst.normalDataBallTree = BallTree(
- inst.matrixD,
- leaf_size=4,
- metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b)
- )
- return inst
|