| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- """
- Module 2: 偏航系统异常检测
- 算法: IsolationForest
- - 替代原 DBSCAN,原生支持 fit/predict 分离
- - 日推理数据量小时 DBSCAN 聚类不稳定,IF 更鲁棒
- - 自带 anomaly score,支持按严重程度排序
- 子检测器:
- A. StaticYawDetector - 静态偏航角 (yaw_ang) 异常检测
- B. CableTwistDetector - 扭缆角度 (twist_ang) 异常检测
- """
- import numpy as np
- import pandas as pd
- from sklearn.ensemble import IsolationForest
- from sklearn.preprocessing import StandardScaler
- import joblib
- from pathlib import Path
- from config import (
- COL_YAW_ANG, COL_TWIST_ANG,
- ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS,
- )
- # ── A. 静态偏航检测器 ──────────────────────────────────────────────────────────
- class StaticYawDetector:
- """
- 特征: yaw_ang、短窗口滚动均值/标准差(2小时)、长窗口滚动均值(12小时)。
- 短窗口捕捉瞬时偏离,长窗口捕捉持续慢漂移。
- IsolationForest 检测偏航角持续偏离正常分布的异常。
- """
- WINDOW_SHORT = 12 # 10min采样 × 12 ≈ 2小时
- WINDOW_LONG = 72 # 10min采样 × 72 ≈ 12小时
- def __init__(self, contamination: float = ISO_CONTAMINATION):
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- s = df[COL_YAW_ANG].copy()
- feat = pd.DataFrame({
- "yaw_ang": s,
- "roll_mean_short": s.rolling(self.WINDOW_SHORT, min_periods=1).mean(),
- "roll_std_short": s.rolling(self.WINDOW_SHORT, min_periods=1).std().fillna(0),
- "roll_mean_long": s.rolling(self.WINDOW_LONG, min_periods=1).mean(),
- }, index=df.index)
- return feat.dropna(subset=["yaw_ang"])
- def fit(self, df: pd.DataFrame) -> "StaticYawDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("偏航特征为空,检查 yaw_ang 测点")
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "StaticYawDetector":
- return joblib.load(path)
- # ── B. 扭缆角度检测器 ──────────────────────────────────────────────────────────
- class CableTwistDetector:
- """
- 特征: twist_ang、绝对值、变化率。
- IsolationForest 检测扭缆角度异常偏离。
- """
- def __init__(self, contamination: float = ISO_CONTAMINATION):
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- s = df[COL_TWIST_ANG].copy()
- feat = pd.DataFrame({
- "twist_ang": s,
- "abs_twist": s.abs(),
- "delta": s.diff().fillna(0),
- }, index=df.index)
- return feat.dropna(subset=["twist_ang"])
- def fit(self, df: pd.DataFrame) -> "CableTwistDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("扭缆特征为空,检查 twist_ang 测点")
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "CableTwistDetector":
- return joblib.load(path)
|