""" Module 2: 偏航系统异常检测 算法: IsolationForest - 替代原 DBSCAN,原生支持 fit/predict 分离 - 日推理数据量小时 DBSCAN 聚类不稳定,IF 更鲁棒 - 自带 anomaly score,支持按严重程度排序 子检测器: A. StaticYawDetector - 静态偏航角 (yaw_ang) 异常检测 B. CableTwistDetector - 扭缆角度 (twist_ang) 异常检测 """ import numpy as np import pandas as pd from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import joblib from pathlib import Path from config import ( COL_YAW_ANG, COL_TWIST_ANG, ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS, ) # ── A. 静态偏航检测器 ────────────────────────────────────────────────────────── class StaticYawDetector: """ 特征: yaw_ang、短窗口滚动均值/标准差(2小时)、长窗口滚动均值(12小时)。 短窗口捕捉瞬时偏离,长窗口捕捉持续慢漂移。 IsolationForest 检测偏航角持续偏离正常分布的异常。 """ WINDOW_SHORT = 12 # 10min采样 × 12 ≈ 2小时 WINDOW_LONG = 72 # 10min采样 × 72 ≈ 12小时 def __init__(self, contamination: float = ISO_CONTAMINATION): self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: s = df[COL_YAW_ANG].copy() feat = pd.DataFrame({ "yaw_ang": s, "roll_mean_short": s.rolling(self.WINDOW_SHORT, min_periods=1).mean(), "roll_std_short": s.rolling(self.WINDOW_SHORT, min_periods=1).std().fillna(0), "roll_mean_long": s.rolling(self.WINDOW_LONG, min_periods=1).mean(), }, index=df.index) return feat.dropna(subset=["yaw_ang"]) def fit(self, df: pd.DataFrame) -> "StaticYawDetector": feat = self._features(df) if feat.empty: raise ValueError("偏航特征为空,检查 yaw_ang 测点") X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "StaticYawDetector": return joblib.load(path) # ── B. 扭缆角度检测器 ────────────────────────────────────────────────────────── class CableTwistDetector: """ 特征: twist_ang、绝对值、变化率。 IsolationForest 检测扭缆角度异常偏离。 """ def __init__(self, contamination: float = ISO_CONTAMINATION): self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: s = df[COL_TWIST_ANG].copy() feat = pd.DataFrame({ "twist_ang": s, "abs_twist": s.abs(), "delta": s.diff().fillna(0), }, index=df.index) return feat.dropna(subset=["twist_ang"]) def fit(self, df: pd.DataFrame) -> "CableTwistDetector": feat = self._features(df) if feat.empty: raise ValueError("扭缆特征为空,检查 twist_ang 测点") X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "CableTwistDetector": return joblib.load(path)