import math import os import joblib import numpy as np import pandas as pd from sqlalchemy import text from sklearn.neighbors import BallTree from app.config import dataBase from app.database import get_engine from typing import Dict class MSET_Temp: """ MSET + SPRT 温度分析类: - 离线训练:genDLMatrix → save_model - 在线推理:load_model → predict_SPRT """ def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str): self.windCode = windCode.strip() self.windTurbineNumberList = windTurbineNumberList or [] self.startTime = startTime self.endTime = endTime # 离线训练/加载后赋值 self.matrixD = None self.healthyResidual = None self.normalDataBallTree = None # SPRT 参数(离线训练时设置) self.feature_weight: np.ndarray | None = None self.alpha: float = 0.1 self.beta: float = 0.1 def _get_data_by_filter(self) -> pd.DataFrame: """ 在线推理专用:根据 self.windTurbineNumberList & 时间拉数据; 如果列表为空,则拉全场数据。 """ table = f"{self.windCode}_minute" engine = get_engine(dataBase.DATA_DB) if self.windTurbineNumberList: turbines = ",".join(f"'{t}'" for t in self.windTurbineNumberList) cond = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end" else: cond = "time_stamp BETWEEN :start AND :end" sql = text(f""" SELECT * FROM {table} WHERE {cond} ORDER BY time_stamp ASC """) return pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime}) def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float: if len(x) != len(y): return 0.0 if m == 'cbd': return float(np.mean([1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)])) diffsq = np.sum((x - y) ** 2) return float(1.0 / (1.0 + math.sqrt(diffsq))) def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int: """ 离线训练:构造 matrixD/matrixL/healthyResidual/BallTree """ m, n = trainDataset.shape if m < dataSize4D + dataSize4L: return -1 # Step1:每维最小/最大入 D D_idx, D = [], [] for i in range(n): col = trainDataset[:, i] for idx in (np.argmin(col), np.argmax(col)): D.append(trainDataset[idx].tolist()) D_idx.append(idx) # Step2:挑样本至 dataSize4D while len(D_idx) < dataSize4D: free = list(set(range(m)) - set(D_idx)) scores = [(np.mean([1 - self.calcSimilarity(trainDataset[i], d) for d in D]), i) for i in free] _, pick = max(scores) D.append(trainDataset[pick].tolist()) D_idx.append(pick) self.matrixD = np.array(D) # BallTree + healthyResidual self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - self.calcSimilarity(a, b) ) # healthyResidual ests = [] for x in trainDataset: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0 / (dist[0] + 1e-1) w /= w.sum() ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0)) self.healthyResidual = np.array(ests) - trainDataset return 0 def calcSPRT(self, newsStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5) -> list[float]: """ Wald-SPRT 得分 """ # 新状态残差 ests = [] for x in newsStates: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0 / (dist[0] + 1e-1); w /= w.sum() ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0)) resN = np.array(ests) - newsStates # 加权 wN = [np.dot(r, feature_weight) for r in resN] wH = [np.dot(r, feature_weight) for r in self.healthyResidual] mu0, sigma0 = np.mean(wH), np.std(wH) low = math.log(beta / (1 - alpha)); high = math.log((1 - beta) / alpha) flags = [] for i in range(len(wN) - decisionGroup + 1): seg = wN[i:i + decisionGroup]; mu1 = np.mean(seg) si = (sum(seg) * (mu1 - mu0) / sigma0 ** 2 - decisionGroup * ((mu1 ** 2 - mu0 ** 2) / (2 * sigma0 ** 2))) si = max(min(si, high), low) flags.append(si / high if si > 0 else si / low) return flags def predict_SPRT(self, newsStates: np.ndarray, decisionGroup: int = 5) -> list[float]: """ 在线推理:用离线保存的 matrixD/healthyResidual/feature_weight/alpha/beta """ return self.calcSPRT( newsStates, self.feature_weight, alpha=self.alpha, beta=self.beta, decisionGroup=decisionGroup ) def save_model(self, path: str): """ Save matrixD, healthyResidual, feature_weight, alpha, beta """ os.makedirs(os.path.dirname(path), exist_ok=True) joblib.dump({ 'matrixD': self.matrixD, 'healthyResidual': self.healthyResidual, 'feature_weight': self.feature_weight, 'alpha': self.alpha, 'beta': self.beta, }, path) @classmethod def load_model(cls, path: str) -> 'MSET_Temp': """ Load + rebuild BallTree """ data = joblib.load(path) inst = cls('', [], '', '') inst.matrixD = data['matrixD'] inst.healthyResidual = data['healthyResidual'] inst.feature_weight = data['feature_weight'] inst.alpha = data['alpha'] inst.beta = data['beta'] inst.normalDataBallTree = BallTree( inst.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b) ) return inst def query_surrounding_data(self, timestamp: str, minutes_around: int = 250) -> Dict: """ 查询指定时间点前后50个点的数据 参数: timestamp: 中心时间点,格式为 'yyyy-mm-dd HH:MM:SS' minutes_around: 查询前后多少分钟的数据 返回: { 'record_count': int, 'records': List[Dict], 'columns_mapping': Dict[str, str] # 字段中英文映射 } """ # 中英文映射字典 cn_map = { 'wind_turbine_name':'风机名称', 'time_stamp': '时间', 'active_power': '有功功率(kW)', 'rotor_speed': '风轮转速(rpm)', 'generator_speed':'发电机转速(rpm)', 'wind_velocity': '风速(m/s)', 'pitch_angle_blade_1':'桨距角1(°)', 'pitch_angle_blade_2':'桨距角2(°)', 'pitch_angle_blade_3':'桨距角3(°)', 'cabin_position':'机舱位置(°)', 'true_wind_direction':'绝对风向(°)', 'yaw_error1':'对风角度(°)', 'set_value_of_active_power':'有功功率设定值(kW)', 'gearbox_oil_temperature':'齿轮箱油温(℃)', 'generatordrive_end_bearing_temperature':'发电机驱动端轴承温度(℃)', 'generatornon_drive_end_bearing_temperature':'发电机非驱动端轴承温度(℃)', 'cabin_temperature':'机舱内温度(℃)', 'twisted_cable_angle':'扭缆角度(°)', 'outside_cabin_temperature':'环境温度(℃)', 'main_bearing_temperature':'主轴承轴承温度(℃)', 'main_bearing_temperature_2': '主轴承轴承温度2(℃)', 'gearbox_high_speed_shaft_bearing_temperature':'齿轮箱高速轴轴承温度(℃)', 'gearboxmedium_speed_shaftbearing_temperature':'齿轮箱中速轴轴承温度(℃)', 'gearbox_low_speed_shaft_bearing_temperature':'齿轮箱低速轴轴承温度(℃)', 'generator_winding1_temperature':'发电机绕组1温度(℃)', 'generator_winding2_temperature':'发电机绕组2温度(℃)', 'generator_winding3_temperature':'发电机绕组3温度(℃)', 'grid_a_phase_current':'电网A相电流(A)', 'grid_b_phase_current': '电网B相电流(A)', 'grid_c_phase_current': '电网C相电流(A)' } table = f"{self.windCode}_minute" engine = get_engine(dataBase.DATA_DB) # 查询数据 sql = text(f""" SELECT * FROM {table} WHERE wind_turbine_number IN ({','.join([f"'{t}'" for t in self.windTurbineNumberList])}) AND time_stamp BETWEEN DATE_SUB(:timestamp, INTERVAL :minutes MINUTE) AND DATE_ADD(:timestamp, INTERVAL :minutes MINUTE) ORDER BY time_stamp ASC """) df = pd.read_sql(sql, engine, params={ "timestamp": timestamp, "minutes": minutes_around }) # 打印查询到的数据条数 record_count = len(df) print(f"查询到 {record_count} 条数据") if df.empty: return { 'record_count': 0, 'records': [], 'columns_mapping': {} } # 删除空列和不需要的列 cols_to_drop = ['wind_turbine_number', 'reactive_power','lab', 'year', 'month','day','year_month','front_back_vibration_of_the_cabin','side_to_side_vibration_of_the_cabin', 'actual_torque','given_torque','clockwise_yaw_count','counterclockwise_yaw_count','unusable','power_curve_available','required_gearbox_speed','inverter_speed_master_control', 'wind_turbine_status','wind_turbine_status2','turbulence_intensity' ] cols_to_drop = [col for col in cols_to_drop if col in df.columns] df = df.drop(columns=cols_to_drop) df = df.dropna(axis=1, how='all') # 转换字段名和格式 df['time_stamp'] = df['time_stamp'].astype(str) records = df.rename(columns=cn_map).to_dict('records') return { 'record_count': record_count, 'records': records }