123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- # temp_diag.py
- import numpy as np
- import pandas as pd
- from sklearn.neighbors import BallTree
- from sqlalchemy import create_engine, text
- import math
- class MSET_Temp:
- """
- 基于 MSET + SPRT 的温度趋势/阈值分析类。
- 查询条件由 wind_turbine_number 列和 time_stamp 范围决定,
- SPRT 阈值固定为 0.99,calcSPRT 输出在 [-1,1]。
- """
- def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str):
- """
- :param windCode: 风机类型或机组代码,用于拼表名。例如 "WOG01312" → 表名 "WOG01312_minute"
- :param windTurbineNumberList: 要查询的 wind_turbine_number(风机编号)列表
- :param startTime: 起始时间(字符串),格式 "YYYY-MM-DD HH:MM"
- :param endTime: 结束时间(字符串),格式 "YYYY-MM-DD HH:MM"
- """
- self.windCode = windCode.strip()
- self.windTurbineNumberList = windTurbineNumberList
- # 强制保留到秒
- self.startTime = startTime
- self.endTime = endTime
- # D/L 矩阵相关
- self.matrixD = None
- self.matrixL = None
- self.healthyResidual = None
- self.normalDataBallTree = None
- # def _truncate_to_seconds(self, dt_str: str) -> str:
- # """
- # 将用户可能传进来的 ISO 时间字符串或包含毫秒的字符串
- # 截断到秒,返回 "YYYY-MM-DD HH:MM:SS" 格式。
- # 例如: "2025-06-01T12:34:56.789Z" → "2025-06-01 12:34:56"
- # """
- # # 先将 'T' 替换成空格,去掉尾部可能的 "Z"
- # s = dt_str.replace("T", " ").rstrip("Z")
- # # 如果含有小数秒,截断
- # if "." in s:
- # s = s.split(".")[0]
- # # 如果还有 "+xx:xx" 时区后缀,也截断
- # if "+" in s:
- # s = s.split("+")[0]
- # return s.strip()
- def _get_data_by_filter(self) -> pd.DataFrame:
- """
- 按 wind_turbine_number 列和 time_stamp 时间范围批量查询,
- 返回一个完整的 DataFrame(已按 time_stamp 升序排序)。
- """
- table_name = f"{self.windCode}_minute"
- engine = create_engine(
- "mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod"
- # "mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod"
- )
- # 准备 wind_turbine_number 列表的 SQL 片段:('WT1','WT2',...)
- turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList)
- sql = text(f"""
- SELECT *
- FROM {table_name}
- WHERE wind_turbine_number IN ({turbines})
- AND time_stamp BETWEEN :start AND :end
- ORDER BY time_stamp ASC
- """)
- df = pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime})
- return df
- def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float:
- """
- 计算向量 x 与 y 的相似度,(0,1] 区间:
- - m='euc' → 欧氏距离
- - m='cbd' → 城市街区距离
- """
- if len(x) != len(y):
- return 0.0
- if m == 'cbd':
- arr = [1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)]
- return float(np.sum(arr) / len(arr))
- else:
- diffsq = [(p - q) ** 2 for p, q in zip(x, y)]
- return float(1.0 / (1.0 + math.sqrt(np.sum(diffsq))))
- def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int:
- """
- 根据训练集 trainDataset 生成 D/L 矩阵:
- - 若样本数 < dataSize4D + dataSize4L,返回 -1
- - 否则构造 matrixD、matrixL,并用局部加权回归获得 healthyResidual,返回 0
- """
- m, n = trainDataset.shape
- if m < dataSize4D + dataSize4L:
- return -1
- # Step1:每个特征的最小/最大样本加入 matrixD
- self.matrixD = []
- selectIndex4D = []
- for i in range(n):
- col_i = trainDataset[:, i]
- idx_min = np.argmin(col_i)
- idx_max = np.argmax(col_i)
- self.matrixD.append(trainDataset[idx_min, :].tolist())
- selectIndex4D.append(idx_min)
- self.matrixD.append(trainDataset[idx_max, :].tolist())
- selectIndex4D.append(idx_max)
- # Step2:对剩余样本逐步选出“与 matrixD 平均距离最大”的样本,直至 matrixD 行数 = dataSize4D
- while len(selectIndex4D) < dataSize4D:
- freeList = list(set(range(len(trainDataset))) - set(selectIndex4D))
- distAvg = []
- for idx in freeList:
- tmp = trainDataset[idx, :]
- dlist = [1.0 - self.calcSimilarity(x, tmp) for x in self.matrixD]
- distAvg.append(np.mean(dlist))
- select_id = freeList[int(np.argmax(distAvg))]
- self.matrixD.append(trainDataset[select_id, :].tolist())
- selectIndex4D.append(select_id)
- self.matrixD = np.array(self.matrixD)
- # 用 matrixD 建 BallTree,用于局部加权回归
- self.normalDataBallTree = BallTree(
- self.matrixD,
- leaf_size=4,
- metric=lambda a, b: 1.0 - self.calcSimilarity(a, b)
- )
- # Step3:把所有训练样本都作为 matrixL
- self.matrixL = trainDataset.copy()
- # Step4:用局部加权回归算出健康残差
- self.healthyResidual = self.calcResidualByLocallyWeightedLR(self.matrixL)
- return 0
- def calcResidualByLocallyWeightedLR(self, newStates: np.ndarray) -> np.ndarray:
- """
- 对 newStates 中每个样本,使用 matrixD 的前 20 个最近邻做局部加权回归,计算残差。
- 返回形状 [len(newStates), 特征数] 的残差矩阵。
- """
- est_list = []
- for x in newStates:
- dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
- w = 1.0 / (dist[0] + 1e-1)
- w = w / np.sum(w)
- est = np.sum([w_i * self.matrixD[j] for w_i, j in zip(w, idxs[0])], axis=0)
- est_list.append(est)
- est_arr = np.reshape(np.array(est_list), (len(est_list), -1))
- return est_arr - newStates
- def calcSPRT(
- self,
- newsStates: np.ndarray,
- feature_weight: np.ndarray,
- alpha: float = 0.1,
- beta: float = 0.1,
- decisionGroup: int = 5
- ) -> list[float]:
- """
- 对 newsStates 运行 Wald-SPRT,返回得分列表,长度 = len(newsStates) - decisionGroup + 1,
- 分数在 [-1, 1]:
- - 越接近 1 → 越“异常(危险)”
- - 越接近 -1 → 越“正常”
- """
- # 1) 计算残差并做特征加权
- stateRes = self.calcResidualByLocallyWeightedLR(newsStates)
- weightedStateResidual = [np.dot(x, feature_weight) for x in stateRes]
- weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual]
- # 2) 健康残差的分布统计
- mu0 = float(np.mean(weightedHealthyResidual))
- sigma0 = float(np.std(weightedHealthyResidual))
- # 3) 计算 SPRT 的上下阈值
- lowThres = np.log(beta / (1.0 - alpha)) # < 0
- highThres = np.log((1.0 - beta) / alpha) # > 0
- flags: list[float] = []
- length = len(weightedStateResidual)
- for i in range(0, length - decisionGroup + 1):
- segment = weightedStateResidual[i : i + decisionGroup]
- mu1 = float(np.mean(segment))
- si = (
- np.sum(segment) * (mu1 - mu0) / (sigma0**2)
- - decisionGroup * ((mu1**2) - (mu0**2)) / (2.0 * (sigma0**2))
- )
- # 限制 si 在 [lowThres, highThres] 之内
- si = max(min(si, highThres), lowThres)
- # 正负归一化
- if si > 0:
- norm_si = float(si / highThres)
- else:
- norm_si = float(si / lowThres)
- flags.append(norm_si)
- return flags
- def check_threshold(self) -> pd.DataFrame:
- """
- 阈值分析(阈值 0.99),长格式:
- 返回所有存在通道的数据,缺失的通道自动跳过。
- """
- THRESHOLD = 0.99
- df = self._get_data_by_filter()
- if df.empty:
- return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
- # 四个通道英文名
- temp_cols_all = [
- 'main_bearing_temperature',
- 'gearbox_oil_temperature',
- 'generatordrive_end_bearing_temperature',
- 'generatornon_drive_end_bearing_temperature'
- ]
- # 只保留存在的列
- temp_cols = [c for c in temp_cols_all if c in df.columns]
- if not temp_cols:
- return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
- # 转数值 & 时间
- df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce')
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
- records = []
- # 英文→中文映射
- cn_map = {
- 'main_bearing_temperature': '主轴承温度',
- 'gearbox_oil_temperature': '齿轮箱油温',
- 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度',
- 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度'
- }
- for col in temp_cols:
- sub = df[['time_stamp', col]].dropna()
- if sub.empty:
- continue
- arr = sub[col].values.reshape(-1,1)
- ts = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist()
- half = len(arr) // 2
- train = arr[:half]
- test = arr[half:]
- # 不足则跳过该通道
- if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0:
- continue
- flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1)
- for i, score in enumerate(flags):
- records.append({
- "time_stamp": ts[half + i],
- "temp_channel": cn_map[col],
- "SPRT_score": score,
- "status": "危险" if score > THRESHOLD else "正常"
- })
- return pd.DataFrame(records, columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
- def get_trend(self) -> dict:
- """
- 趋势分析:对每个通道单独计算,缺失或训练不足时输出空结构。
- """
- df = self._get_data_by_filter()
- # 英文→输出字段名
- key_map = {
- 'main_bearing_temperature': 'main_bearing',
- 'gearbox_oil_temperature': 'gearbox_oil',
- 'generatordrive_end_bearing_temperature': 'generator_drive_end',
- 'generatornon_drive_end_bearing_temperature': 'generator_nondrive_end'
- }
- # 预置结果
- result = {v: {} for v in key_map.values()}
- if df.empty:
- return {"data": result, "code": 200, "message": "success"}
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
- for col, out_key in key_map.items():
- if col not in df.columns:
- continue
- sub = df[['time_stamp', col]].dropna()
- if sub.empty:
- continue
- vals = pd.to_numeric(sub[col], errors='coerce').values
- ts_list = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist()
- half = len(vals) // 2
- train = vals[:half].reshape(-1,1)
- test = vals[half:].reshape(-1,1)
- # 训练不足时输出空列表
- if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0:
- flags = []
- else:
- flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1)
- result[out_key] = {
- "timestamps": ts_list[half:],
- "values": flags
- }
- return {"data": result, "code": 200, "message": "success"}
- # def get_trend(self) -> dict:
- # """
- # 趋势分析
- # 获取温度趋势:将温度数据按时间返回。
- # 返回格式:{
- # "timestamps": [ISO8601 字符串列表],
- # "channels": [
- # {"temp_channel": "main_bearing_temperature", "values": [浮点列表]},
- # {"temp_channel": "gearbox_oil_temperature", "values": [...]},
- # ...
- # ],
- # "unit": "°C"
- # }
- # """
- # df = self._get_data_by_filter()
- # if df.empty:
- # return {"timestamps": [], "channels": [], "unit": "°C"}
- # # 定义所有需要检查的温度列
- # temp_cols_all = [
- # 'main_bearing_temperature',
- # 'gearbox_oil_temperature',
- # 'generatordrive_end_bearing_temperature',
- # 'generatornon_drive_end_bearing_temperature'
- # ]
- # # 选择实际存在的列
- # temp_cols = [c for c in temp_cols_all if c in df.columns]
-
- # # 如果没有温度数据列,返回空数据
- # if not temp_cols:
- # return {"timestamps": [], "channels": [], "unit": "°C"}
- # # 转数值,并删除 NaN
- # df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce')
- # df = df.dropna(subset=temp_cols + ['time_stamp'])
- # # 转时间戳为 `YYYY-MM-DD HH:MM:SS` 格式
- # df['time_stamp'] = pd.to_datetime(df['time_stamp']).dt.strftime("%Y-%m-%d %H:%M:%S")
- # df = df.sort_values('time_stamp').reset_index(drop=True)
- # # 时间戳格式化为 ISO 8601 字符串
- # timestamps = df['time_stamp'].tolist()
- # # 对每个通道,收集它在相应行的数值
- # channels_data = []
- # for col in temp_cols:
- # channels_data.append({
- # "temp_channel": col,
- # "values": df[col].tolist()
- # })
-
- # # 将 temp_channel 列的英文名称改为中文
- # temp_channel_mapping = {
- # 'main_bearing_temperature': '主轴承温度',
- # 'gearbox_oil_temperature': '齿轮箱油温',
- # 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度',
- # 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度'
- # }
- # for channel in channels_data:
- # channel['temp_channel'] = temp_channel_mapping.get(channel['temp_channel'], channel['temp_channel'])
- # return {
- # "timestamps": timestamps,
- # "channels": channels_data,
- # "unit": "°C"
- # }
|