Temp_Diag.PY 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # temp_diag.py
  2. import numpy as np
  3. import pandas as pd
  4. from sklearn.neighbors import BallTree
  5. from sqlalchemy import create_engine, text
  6. import math
  7. class MSET_Temp:
  8. """
  9. 基于 MSET + SPRT 的温度趋势/阈值分析类。
  10. 查询条件由 wind_turbine_number 列和 time_stamp 范围决定,
  11. SPRT 阈值固定为 0.99,calcSPRT 输出在 [-1,1]。
  12. """
  13. def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str):
  14. """
  15. :param windCode: 风机类型或机组代码,用于拼表名。例如 "WOG01312" → 表名 "WOG01312_minute"
  16. :param windTurbineNumberList: 要查询的 wind_turbine_number(风机编号)列表
  17. :param startTime: 起始时间(字符串),格式 "YYYY-MM-DD HH:MM"
  18. :param endTime: 结束时间(字符串),格式 "YYYY-MM-DD HH:MM"
  19. """
  20. self.windCode = windCode.strip()
  21. self.windTurbineNumberList = windTurbineNumberList
  22. # 强制保留到秒
  23. self.startTime = startTime
  24. self.endTime = endTime
  25. # D/L 矩阵相关
  26. self.matrixD = None
  27. self.matrixL = None
  28. self.healthyResidual = None
  29. self.normalDataBallTree = None
  30. # def _truncate_to_seconds(self, dt_str: str) -> str:
  31. # """
  32. # 将用户可能传进来的 ISO 时间字符串或包含毫秒的字符串
  33. # 截断到秒,返回 "YYYY-MM-DD HH:MM:SS" 格式。
  34. # 例如: "2025-06-01T12:34:56.789Z" → "2025-06-01 12:34:56"
  35. # """
  36. # # 先将 'T' 替换成空格,去掉尾部可能的 "Z"
  37. # s = dt_str.replace("T", " ").rstrip("Z")
  38. # # 如果含有小数秒,截断
  39. # if "." in s:
  40. # s = s.split(".")[0]
  41. # # 如果还有 "+xx:xx" 时区后缀,也截断
  42. # if "+" in s:
  43. # s = s.split("+")[0]
  44. # return s.strip()
  45. def _get_data_by_filter(self) -> pd.DataFrame:
  46. """
  47. 按 wind_turbine_number 列和 time_stamp 时间范围批量查询,
  48. 返回一个完整的 DataFrame(已按 time_stamp 升序排序)。
  49. """
  50. table_name = f"{self.windCode}_minute"
  51. engine = create_engine(
  52. # "mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod"
  53. "mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod"
  54. )
  55. # 准备 wind_turbine_number 列表的 SQL 片段:('WT1','WT2',...)
  56. turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList)
  57. sql = text(f"""
  58. SELECT *
  59. FROM {table_name}
  60. WHERE wind_turbine_number IN ({turbines})
  61. AND time_stamp BETWEEN :start AND :end
  62. ORDER BY time_stamp ASC
  63. """)
  64. df = pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime})
  65. return df
  66. def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float:
  67. """
  68. 计算向量 x 与 y 的相似度,(0,1] 区间:
  69. - m='euc' → 欧氏距离
  70. - m='cbd' → 城市街区距离
  71. """
  72. if len(x) != len(y):
  73. return 0.0
  74. if m == 'cbd':
  75. arr = [1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)]
  76. return float(np.sum(arr) / len(arr))
  77. else:
  78. diffsq = [(p - q) ** 2 for p, q in zip(x, y)]
  79. return float(1.0 / (1.0 + math.sqrt(np.sum(diffsq))))
  80. def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int:
  81. """
  82. 根据训练集 trainDataset 生成 D/L 矩阵:
  83. - 若样本数 < dataSize4D + dataSize4L,返回 -1
  84. - 否则构造 matrixD、matrixL,并用局部加权回归获得 healthyResidual,返回 0
  85. """
  86. m, n = trainDataset.shape
  87. if m < dataSize4D + dataSize4L:
  88. return -1
  89. # Step1:每个特征的最小/最大样本加入 matrixD
  90. self.matrixD = []
  91. selectIndex4D = []
  92. for i in range(n):
  93. col_i = trainDataset[:, i]
  94. idx_min = np.argmin(col_i)
  95. idx_max = np.argmax(col_i)
  96. self.matrixD.append(trainDataset[idx_min, :].tolist())
  97. selectIndex4D.append(idx_min)
  98. self.matrixD.append(trainDataset[idx_max, :].tolist())
  99. selectIndex4D.append(idx_max)
  100. # Step2:对剩余样本逐步选出“与 matrixD 平均距离最大”的样本,直至 matrixD 行数 = dataSize4D
  101. while len(selectIndex4D) < dataSize4D:
  102. freeList = list(set(range(len(trainDataset))) - set(selectIndex4D))
  103. distAvg = []
  104. for idx in freeList:
  105. tmp = trainDataset[idx, :]
  106. dlist = [1.0 - self.calcSimilarity(x, tmp) for x in self.matrixD]
  107. distAvg.append(np.mean(dlist))
  108. select_id = freeList[int(np.argmax(distAvg))]
  109. self.matrixD.append(trainDataset[select_id, :].tolist())
  110. selectIndex4D.append(select_id)
  111. self.matrixD = np.array(self.matrixD)
  112. # 用 matrixD 建 BallTree,用于局部加权回归
  113. self.normalDataBallTree = BallTree(
  114. self.matrixD,
  115. leaf_size=4,
  116. metric=lambda a, b: 1.0 - self.calcSimilarity(a, b)
  117. )
  118. # Step3:把所有训练样本都作为 matrixL
  119. self.matrixL = trainDataset.copy()
  120. # Step4:用局部加权回归算出健康残差
  121. self.healthyResidual = self.calcResidualByLocallyWeightedLR(self.matrixL)
  122. return 0
  123. def calcResidualByLocallyWeightedLR(self, newStates: np.ndarray) -> np.ndarray:
  124. """
  125. 对 newStates 中每个样本,使用 matrixD 的前 20 个最近邻做局部加权回归,计算残差。
  126. 返回形状 [len(newStates), 特征数] 的残差矩阵。
  127. """
  128. est_list = []
  129. for x in newStates:
  130. dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
  131. w = 1.0 / (dist[0] + 1e-1)
  132. w = w / np.sum(w)
  133. est = np.sum([w_i * self.matrixD[j] for w_i, j in zip(w, idxs[0])], axis=0)
  134. est_list.append(est)
  135. est_arr = np.reshape(np.array(est_list), (len(est_list), -1))
  136. return est_arr - newStates
  137. def calcSPRT(
  138. self,
  139. newsStates: np.ndarray,
  140. feature_weight: np.ndarray,
  141. alpha: float = 0.1,
  142. beta: float = 0.1,
  143. decisionGroup: int = 5
  144. ) -> list[float]:
  145. """
  146. 对 newsStates 运行 Wald-SPRT,返回得分列表,长度 = len(newsStates) - decisionGroup + 1,
  147. 分数在 [-1, 1]:
  148. - 越接近 1 → 越“异常(危险)”
  149. - 越接近 -1 → 越“正常”
  150. """
  151. # 1) 计算残差并做特征加权
  152. stateRes = self.calcResidualByLocallyWeightedLR(newsStates)
  153. weightedStateResidual = [np.dot(x, feature_weight) for x in stateRes]
  154. weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual]
  155. # 2) 健康残差的分布统计
  156. mu0 = float(np.mean(weightedHealthyResidual))
  157. sigma0 = float(np.std(weightedHealthyResidual))
  158. # 3) 计算 SPRT 的上下阈值
  159. lowThres = np.log(beta / (1.0 - alpha)) # < 0
  160. highThres = np.log((1.0 - beta) / alpha) # > 0
  161. flags: list[float] = []
  162. length = len(weightedStateResidual)
  163. for i in range(0, length - decisionGroup + 1):
  164. segment = weightedStateResidual[i : i + decisionGroup]
  165. mu1 = float(np.mean(segment))
  166. si = (
  167. np.sum(segment) * (mu1 - mu0) / (sigma0**2)
  168. - decisionGroup * ((mu1**2) - (mu0**2)) / (2.0 * (sigma0**2))
  169. )
  170. # 限制 si 在 [lowThres, highThres] 之内
  171. si = max(min(si, highThres), lowThres)
  172. # 正负归一化
  173. if si > 0:
  174. norm_si = float(si / highThres)
  175. else:
  176. norm_si = float(si / lowThres)
  177. flags.append(norm_si)
  178. return flags
  179. def check_threshold(self) -> pd.DataFrame:
  180. """
  181. 阈值分析(阈值固定 0.99)。返回长格式 DataFrame,列:
  182. ["time_stamp", "temp_channel", "SPRT_score", "status"]
  183. status = "危险" if SPRT_score > 0.99 else "正常"。
  184. """
  185. THRESHOLD = 0.99
  186. # 1) 按风机编号 + 时间范围查询原始数据
  187. df_concat = self._get_data_by_filter()
  188. if df_concat.empty:
  189. return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
  190. # 2) 筛选存在的温度列
  191. temp_cols_all = [
  192. 'main_bearing_temperature',
  193. 'gearbox_oil_temperature',
  194. 'generatordrive_end_bearing_temperature',
  195. 'generatornon_drive_end_bearing_temperature'
  196. ]
  197. temp_cols = [c for c in temp_cols_all if c in df_concat.columns]
  198. if not temp_cols:
  199. return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
  200. # 3) 转数值 & 删除 NaN
  201. df_concat[temp_cols] = df_concat[temp_cols].apply(pd.to_numeric, errors='coerce')
  202. df_concat = df_concat.dropna(subset=temp_cols + ['time_stamp'])
  203. if df_concat.empty:
  204. return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
  205. # 4) time_stamp 转 datetime
  206. df_concat['time_stamp'] = pd.to_datetime(df_concat['time_stamp'])
  207. x_date = df_concat['time_stamp']
  208. # 5) 抽取温度列到 NumPy 数组
  209. arr = df_concat[temp_cols].values # shape = [总记录数, 通道数]
  210. m, n = arr.shape
  211. half = m // 2
  212. all_flags: list[list[float]] = []
  213. for i in range(n):
  214. channel = arr[:, i]
  215. train = channel[:half].reshape(-1, 1)
  216. test = channel[half:].reshape(-1, 1)
  217. # 用训练集构造 D/L 矩阵
  218. if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0:
  219. # 如果训练集样本不足,直接返回空表
  220. return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
  221. feature_w = np.array([1.0])
  222. flags = self.calcSPRT(test, feature_w, decisionGroup=1)
  223. all_flags.append(flags)
  224. # 6) 合并为宽表,再 melt 成长表
  225. flags_arr = np.array(all_flags) # shape = [通道数, 测试样本数]
  226. num_test = flags_arr.shape[1]
  227. ts = x_date.iloc[half : half + num_test].reset_index(drop=True)
  228. wide = pd.DataFrame({"time_stamp": ts})
  229. for idx, col in enumerate(temp_cols):
  230. wide[col] = flags_arr[idx, :]
  231. df_long = wide.melt(
  232. id_vars=["time_stamp"],
  233. value_vars=temp_cols,
  234. var_name="temp_channel",
  235. value_name="SPRT_score"
  236. )
  237. # 把 time_stamp 从 datetime 转成字符串,格式 "YYYY-MM-DD HH:MM:SS"
  238. df_long['time_stamp'] = pd.to_datetime(df_long['time_stamp']).dt.strftime("%Y-%m-%d %H:%M:%S")
  239. # 7) 添加状态列:SPRT_score > 0.99 → “危险”,否则 “正常”
  240. df_long['status'] = df_long['SPRT_score'].apply(
  241. lambda x: "危险" if x > THRESHOLD else "正常"
  242. )
  243. # 8) 将 temp_channel 列的英文名称改为中文
  244. temp_channel_mapping = {
  245. 'main_bearing_temperature': '主轴承温度',
  246. 'gearbox_oil_temperature': '齿轮箱油温',
  247. 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度',
  248. 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度'
  249. }
  250. df_long['temp_channel'] = df_long['temp_channel'].map(temp_channel_mapping)
  251. return df_long
  252. def get_trend(self) -> dict:
  253. """
  254. 趋势分析
  255. 获取温度趋势:将温度数据按时间返回。
  256. 返回格式:{
  257. "timestamps": [ISO8601 字符串列表],
  258. "channels": [
  259. {"temp_channel": "main_bearing_temperature", "values": [浮点列表]},
  260. {"temp_channel": "gearbox_oil_temperature", "values": [...]},
  261. ...
  262. ],
  263. "unit": "°C"
  264. }
  265. """
  266. df = self._get_data_by_filter()
  267. if df.empty:
  268. return {"timestamps": [], "channels": [], "unit": "°C"}
  269. # 定义所有需要检查的温度列
  270. temp_cols_all = [
  271. 'main_bearing_temperature',
  272. 'gearbox_oil_temperature',
  273. 'generatordrive_end_bearing_temperature',
  274. 'generatornon_drive_end_bearing_temperature'
  275. ]
  276. # 选择实际存在的列
  277. temp_cols = [c for c in temp_cols_all if c in df.columns]
  278. # 如果没有温度数据列,返回空数据
  279. if not temp_cols:
  280. return {"timestamps": [], "channels": [], "unit": "°C"}
  281. # 转数值,并删除 NaN
  282. df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce')
  283. df = df.dropna(subset=temp_cols + ['time_stamp'])
  284. # 转时间戳为 `YYYY-MM-DD HH:MM:SS` 格式
  285. df['time_stamp'] = pd.to_datetime(df['time_stamp']).dt.strftime("%Y-%m-%d %H:%M:%S")
  286. df = df.sort_values('time_stamp').reset_index(drop=True)
  287. # 时间戳格式化为 ISO 8601 字符串
  288. timestamps = df['time_stamp'].tolist()
  289. # 对每个通道,收集它在相应行的数值
  290. channels_data = []
  291. for col in temp_cols:
  292. channels_data.append({
  293. "temp_channel": col,
  294. "values": df[col].tolist()
  295. })
  296. # 将 temp_channel 列的英文名称改为中文
  297. temp_channel_mapping = {
  298. 'main_bearing_temperature': '主轴承温度',
  299. 'gearbox_oil_temperature': '齿轮箱油温',
  300. 'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度',
  301. 'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度'
  302. }
  303. for channel in channels_data:
  304. channel['temp_channel'] = temp_channel_mapping.get(channel['temp_channel'], channel['temp_channel'])
  305. return {
  306. "timestamps": timestamps,
  307. "channels": channels_data,
  308. "unit": "°C"
  309. }