# temp_diag.py import numpy as np import pandas as pd from sklearn.neighbors import BallTree from sqlalchemy import create_engine, text import math class MSET_Temp: """ 基于 MSET + SPRT 的温度趋势/阈值分析类。 查询条件由 wind_turbine_number 列和 time_stamp 范围决定, SPRT 阈值固定为 0.99,calcSPRT 输出在 [-1,1]。 """ def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str): """ :param windCode: 风机类型或机组代码,用于拼表名。例如 "WOG01312" → 表名 "WOG01312_minute" :param windTurbineNumberList: 要查询的 wind_turbine_number(风机编号)列表 :param startTime: 起始时间(字符串),格式 "YYYY-MM-DD HH:MM" :param endTime: 结束时间(字符串),格式 "YYYY-MM-DD HH:MM" """ self.windCode = windCode.strip() self.windTurbineNumberList = windTurbineNumberList # 强制保留到秒 self.startTime = startTime self.endTime = endTime # D/L 矩阵相关 self.matrixD = None self.matrixL = None self.healthyResidual = None self.normalDataBallTree = None # def _truncate_to_seconds(self, dt_str: str) -> str: # """ # 将用户可能传进来的 ISO 时间字符串或包含毫秒的字符串 # 截断到秒,返回 "YYYY-MM-DD HH:MM:SS" 格式。 # 例如: "2025-06-01T12:34:56.789Z" → "2025-06-01 12:34:56" # """ # # 先将 'T' 替换成空格,去掉尾部可能的 "Z" # s = dt_str.replace("T", " ").rstrip("Z") # # 如果含有小数秒,截断 # if "." in s: # s = s.split(".")[0] # # 如果还有 "+xx:xx" 时区后缀,也截断 # if "+" in s: # s = s.split("+")[0] # return s.strip() def _get_data_by_filter(self) -> pd.DataFrame: """ 按 wind_turbine_number 列和 time_stamp 时间范围批量查询, 返回一个完整的 DataFrame(已按 time_stamp 升序排序)。 """ table_name = f"{self.windCode}_minute" engine = create_engine( "mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod" # "mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod" ) # 准备 wind_turbine_number 列表的 SQL 片段:('WT1','WT2',...) turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList) sql = text(f""" SELECT * FROM {table_name} WHERE wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end ORDER BY time_stamp ASC """) df = pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime}) return df def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float: """ 计算向量 x 与 y 的相似度,(0,1] 区间: - m='euc' → 欧氏距离 - m='cbd' → 城市街区距离 """ if len(x) != len(y): return 0.0 if m == 'cbd': arr = [1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)] return float(np.sum(arr) / len(arr)) else: diffsq = [(p - q) ** 2 for p, q in zip(x, y)] return float(1.0 / (1.0 + math.sqrt(np.sum(diffsq)))) def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int: """ 根据训练集 trainDataset 生成 D/L 矩阵: - 若样本数 < dataSize4D + dataSize4L,返回 -1 - 否则构造 matrixD、matrixL,并用局部加权回归获得 healthyResidual,返回 0 """ m, n = trainDataset.shape if m < dataSize4D + dataSize4L: return -1 # Step1:每个特征的最小/最大样本加入 matrixD self.matrixD = [] selectIndex4D = [] for i in range(n): col_i = trainDataset[:, i] idx_min = np.argmin(col_i) idx_max = np.argmax(col_i) self.matrixD.append(trainDataset[idx_min, :].tolist()) selectIndex4D.append(idx_min) self.matrixD.append(trainDataset[idx_max, :].tolist()) selectIndex4D.append(idx_max) # Step2:对剩余样本逐步选出“与 matrixD 平均距离最大”的样本,直至 matrixD 行数 = dataSize4D while len(selectIndex4D) < dataSize4D: freeList = list(set(range(len(trainDataset))) - set(selectIndex4D)) distAvg = [] for idx in freeList: tmp = trainDataset[idx, :] dlist = [1.0 - self.calcSimilarity(x, tmp) for x in self.matrixD] distAvg.append(np.mean(dlist)) select_id = freeList[int(np.argmax(distAvg))] self.matrixD.append(trainDataset[select_id, :].tolist()) selectIndex4D.append(select_id) self.matrixD = np.array(self.matrixD) # 用 matrixD 建 BallTree,用于局部加权回归 self.normalDataBallTree = BallTree( self.matrixD, leaf_size=4, metric=lambda a, b: 1.0 - self.calcSimilarity(a, b) ) # Step3:把所有训练样本都作为 matrixL self.matrixL = trainDataset.copy() # Step4:用局部加权回归算出健康残差 self.healthyResidual = self.calcResidualByLocallyWeightedLR(self.matrixL) return 0 def calcResidualByLocallyWeightedLR(self, newStates: np.ndarray) -> np.ndarray: """ 对 newStates 中每个样本,使用 matrixD 的前 20 个最近邻做局部加权回归,计算残差。 返回形状 [len(newStates), 特征数] 的残差矩阵。 """ est_list = [] for x in newStates: dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True) w = 1.0 / (dist[0] + 1e-1) w = w / np.sum(w) est = np.sum([w_i * self.matrixD[j] for w_i, j in zip(w, idxs[0])], axis=0) est_list.append(est) est_arr = np.reshape(np.array(est_list), (len(est_list), -1)) return est_arr - newStates def calcSPRT( self, newsStates: np.ndarray, feature_weight: np.ndarray, alpha: float = 0.1, beta: float = 0.1, decisionGroup: int = 5 ) -> list[float]: """ 对 newsStates 运行 Wald-SPRT,返回得分列表,长度 = len(newsStates) - decisionGroup + 1, 分数在 [-1, 1]: - 越接近 1 → 越“异常(危险)” - 越接近 -1 → 越“正常” """ # 1) 计算残差并做特征加权 stateRes = self.calcResidualByLocallyWeightedLR(newsStates) weightedStateResidual = [np.dot(x, feature_weight) for x in stateRes] weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual] # 2) 健康残差的分布统计 mu0 = float(np.mean(weightedHealthyResidual)) sigma0 = float(np.std(weightedHealthyResidual)) # 3) 计算 SPRT 的上下阈值 lowThres = np.log(beta / (1.0 - alpha)) # < 0 highThres = np.log((1.0 - beta) / alpha) # > 0 flags: list[float] = [] length = len(weightedStateResidual) for i in range(0, length - decisionGroup + 1): segment = weightedStateResidual[i : i + decisionGroup] mu1 = float(np.mean(segment)) si = ( np.sum(segment) * (mu1 - mu0) / (sigma0**2) - decisionGroup * ((mu1**2) - (mu0**2)) / (2.0 * (sigma0**2)) ) # 限制 si 在 [lowThres, highThres] 之内 si = max(min(si, highThres), lowThres) # 正负归一化 if si > 0: norm_si = float(si / highThres) else: norm_si = float(si / lowThres) flags.append(norm_si) return flags def check_threshold(self) -> pd.DataFrame: """ 阈值分析(阈值 0.99),长格式: 返回所有存在通道的数据,缺失的通道自动跳过。 """ THRESHOLD = 0.99 df = self._get_data_by_filter() if df.empty: return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"]) # 四个通道英文名 temp_cols_all = [ 'main_bearing_temperature', 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature', 'generatornon_drive_end_bearing_temperature' ] # 只保留存在的列 temp_cols = [c for c in temp_cols_all if c in df.columns] if not temp_cols: return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"]) # 转数值 & 时间 df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce') df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') records = [] # 英文→中文映射 cn_map = { 'main_bearing_temperature': '主轴承温度', 'gearbox_oil_temperature': '齿轮箱油温', 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度', 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度' } for col in temp_cols: sub = df[['time_stamp', col]].dropna() if sub.empty: continue arr = sub[col].values.reshape(-1,1) ts = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist() half = len(arr) // 2 train = arr[:half] test = arr[half:] # 不足则跳过该通道 if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0: continue flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1) for i, score in enumerate(flags): records.append({ "time_stamp": ts[half + i], "temp_channel": cn_map[col], "SPRT_score": score, "status": "危险" if score > THRESHOLD else "正常" }) return pd.DataFrame(records, columns=["time_stamp", "temp_channel", "SPRT_score", "status"]) def get_trend(self) -> dict: """ 趋势分析:对每个通道单独计算,缺失或训练不足时输出空结构。 """ df = self._get_data_by_filter() # 英文→输出字段名 key_map = { 'main_bearing_temperature': 'main_bearing', 'gearbox_oil_temperature': 'gearbox_oil', 'generatordrive_end_bearing_temperature': 'generator_drive_end', 'generatornon_drive_end_bearing_temperature': 'generator_nondrive_end' } # 预置结果 result = {v: {} for v in key_map.values()} if df.empty: return {"data": result, "code": 200, "message": "success"} df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') for col, out_key in key_map.items(): if col not in df.columns: continue sub = df[['time_stamp', col]].dropna() if sub.empty: continue vals = pd.to_numeric(sub[col], errors='coerce').values ts_list = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist() half = len(vals) // 2 train = vals[:half].reshape(-1,1) test = vals[half:].reshape(-1,1) # 训练不足时输出空列表 if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0: flags = [] else: flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1) result[out_key] = { "timestamps": ts_list[half:], "values": flags } return {"data": result, "code": 200, "message": "success"} # def get_trend(self) -> dict: # """ # 趋势分析 # 获取温度趋势:将温度数据按时间返回。 # 返回格式:{ # "timestamps": [ISO8601 字符串列表], # "channels": [ # {"temp_channel": "main_bearing_temperature", "values": [浮点列表]}, # {"temp_channel": "gearbox_oil_temperature", "values": [...]}, # ... # ], # "unit": "°C" # } # """ # df = self._get_data_by_filter() # if df.empty: # return {"timestamps": [], "channels": [], "unit": "°C"} # # 定义所有需要检查的温度列 # temp_cols_all = [ # 'main_bearing_temperature', # 'gearbox_oil_temperature', # 'generatordrive_end_bearing_temperature', # 'generatornon_drive_end_bearing_temperature' # ] # # 选择实际存在的列 # temp_cols = [c for c in temp_cols_all if c in df.columns] # # 如果没有温度数据列,返回空数据 # if not temp_cols: # return {"timestamps": [], "channels": [], "unit": "°C"} # # 转数值,并删除 NaN # df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce') # df = df.dropna(subset=temp_cols + ['time_stamp']) # # 转时间戳为 `YYYY-MM-DD HH:MM:SS` 格式 # df['time_stamp'] = pd.to_datetime(df['time_stamp']).dt.strftime("%Y-%m-%d %H:%M:%S") # df = df.sort_values('time_stamp').reset_index(drop=True) # # 时间戳格式化为 ISO 8601 字符串 # timestamps = df['time_stamp'].tolist() # # 对每个通道,收集它在相应行的数值 # channels_data = [] # for col in temp_cols: # channels_data.append({ # "temp_channel": col, # "values": df[col].tolist() # }) # # 将 temp_channel 列的英文名称改为中文 # temp_channel_mapping = { # 'main_bearing_temperature': '主轴承温度', # 'gearbox_oil_temperature': '齿轮箱油温', # 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度', # 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度' # } # for channel in channels_data: # channel['temp_channel'] = temp_channel_mapping.get(channel['temp_channel'], channel['temp_channel']) # return { # "timestamps": timestamps, # "channels": channels_data, # "unit": "°C" # }