yaw.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. """
  2. Module 2: 偏航系统异常检测
  3. 算法: IsolationForest
  4. - 替代原 DBSCAN,原生支持 fit/predict 分离
  5. - 日推理数据量小时 DBSCAN 聚类不稳定,IF 更鲁棒
  6. - 自带 anomaly score,支持按严重程度排序
  7. 子检测器:
  8. A. StaticYawDetector - 静态偏航角 (yaw_ang) 异常检测
  9. B. CableTwistDetector - 扭缆角度 (twist_ang) 异常检测
  10. """
  11. import numpy as np
  12. import pandas as pd
  13. from sklearn.ensemble import IsolationForest
  14. from sklearn.preprocessing import StandardScaler
  15. import joblib
  16. from pathlib import Path
  17. from config import (
  18. COL_YAW_ANG, COL_TWIST_ANG,
  19. ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS,
  20. )
  21. # ── A. 静态偏航检测器 ──────────────────────────────────────────────────────────
  22. class StaticYawDetector:
  23. """
  24. 特征: yaw_ang、短窗口滚动均值/标准差(2小时)、长窗口滚动均值(12小时)。
  25. 短窗口捕捉瞬时偏离,长窗口捕捉持续慢漂移。
  26. IsolationForest 检测偏航角持续偏离正常分布的异常。
  27. """
  28. WINDOW_SHORT = 12 # 10min采样 × 12 ≈ 2小时
  29. WINDOW_LONG = 72 # 10min采样 × 72 ≈ 12小时
  30. def __init__(self, contamination: float = ISO_CONTAMINATION):
  31. self.scaler = StandardScaler()
  32. self.model = IsolationForest(
  33. n_estimators=ISO_N_ESTIMATORS,
  34. contamination=contamination,
  35. random_state=ISO_RANDOM_STATE,
  36. )
  37. def _features(self, df: pd.DataFrame) -> pd.DataFrame:
  38. s = df[COL_YAW_ANG].copy()
  39. feat = pd.DataFrame({
  40. "yaw_ang": s,
  41. "roll_mean_short": s.rolling(self.WINDOW_SHORT, min_periods=1).mean(),
  42. "roll_std_short": s.rolling(self.WINDOW_SHORT, min_periods=1).std().fillna(0),
  43. "roll_mean_long": s.rolling(self.WINDOW_LONG, min_periods=1).mean(),
  44. }, index=df.index)
  45. return feat.dropna(subset=["yaw_ang"])
  46. def fit(self, df: pd.DataFrame) -> "StaticYawDetector":
  47. feat = self._features(df)
  48. if feat.empty:
  49. raise ValueError("偏航特征为空,检查 yaw_ang 测点")
  50. X = self.scaler.fit_transform(feat)
  51. self.model.fit(X)
  52. return self
  53. def predict(self, df: pd.DataFrame) -> pd.DataFrame:
  54. out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
  55. feat = self._features(df)
  56. if feat.empty:
  57. return out
  58. X = self.scaler.transform(feat)
  59. out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
  60. out.loc[feat.index, "score"] = self.model.score_samples(X)
  61. return out
  62. def save(self, path: Path):
  63. joblib.dump(self, path)
  64. @classmethod
  65. def load(cls, path: Path) -> "StaticYawDetector":
  66. return joblib.load(path)
  67. # ── B. 扭缆角度检测器 ──────────────────────────────────────────────────────────
  68. class CableTwistDetector:
  69. """
  70. 特征: twist_ang、绝对值、变化率。
  71. IsolationForest 检测扭缆角度异常偏离。
  72. """
  73. def __init__(self, contamination: float = ISO_CONTAMINATION):
  74. self.scaler = StandardScaler()
  75. self.model = IsolationForest(
  76. n_estimators=ISO_N_ESTIMATORS,
  77. contamination=contamination,
  78. random_state=ISO_RANDOM_STATE,
  79. )
  80. def _features(self, df: pd.DataFrame) -> pd.DataFrame:
  81. s = df[COL_TWIST_ANG].copy()
  82. feat = pd.DataFrame({
  83. "twist_ang": s,
  84. "abs_twist": s.abs(),
  85. "delta": s.diff().fillna(0),
  86. }, index=df.index)
  87. return feat.dropna(subset=["twist_ang"])
  88. def fit(self, df: pd.DataFrame) -> "CableTwistDetector":
  89. feat = self._features(df)
  90. if feat.empty:
  91. raise ValueError("扭缆特征为空,检查 twist_ang 测点")
  92. X = self.scaler.fit_transform(feat)
  93. self.model.fit(X)
  94. return self
  95. def predict(self, df: pd.DataFrame) -> pd.DataFrame:
  96. out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
  97. feat = self._features(df)
  98. if feat.empty:
  99. return out
  100. X = self.scaler.transform(feat)
  101. out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
  102. out.loc[feat.index, "score"] = self.model.score_samples(X)
  103. return out
  104. def save(self, path: Path):
  105. joblib.dump(self, path)
  106. @classmethod
  107. def load(cls, path: Path) -> "CableTwistDetector":
  108. return joblib.load(path)