| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- """
- Module 3: 变桨系统异常检测
- 算法: LocalOutlierFactor (LOF)
- - 基于局部密度,适合检测多桨叶不一致的局部异常
- - novelty=True 支持 fit/predict 分离(训练集拟合,推理时预测新数据)
- - n_neighbors 自适应:min(20, len(train) // 50),避免小样本退化
- 子检测器:
- A. PitchRegulationDetector - 桨距角调节异常(设定值 vs 实际值,3个桨叶)
- B. PitchCoordDetector - 变桨-转速-功率协调异常
- C. MinPitchDetector - 最小桨距角异常(保留 IsolationForest,分布异常更合适)
- """
- import pandas as pd
- import numpy as np
- from sklearn.neighbors import LocalOutlierFactor
- from sklearn.ensemble import IsolationForest
- from sklearn.preprocessing import StandardScaler
- import joblib
- from pathlib import Path
- from config import (
- COL_PITCH_SET_1, COL_PITCH_SET_2, COL_PITCH_SET_3,
- COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3,
- COL_PITCH_SPD_1, COL_PITCH_SPD_2, COL_PITCH_SPD_3,
- COL_ROTOR_SPD, COL_P_ACTIVE,
- ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS,
- )
- PITCH_SET_COLS = [COL_PITCH_SET_1, COL_PITCH_SET_2, COL_PITCH_SET_3]
- PITCH_ACT_COLS = [COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3]
- PITCH_SPD_COLS = [COL_PITCH_SPD_1, COL_PITCH_SPD_2, COL_PITCH_SPD_3]
- # ── A. 桨距角调节检测器 (LOF) ──────────────────────────────────────────────────
- class PitchRegulationDetector:
- """
- 特征:
- - 每个桨叶的 (设定值-实际值) 偏差
- - 三桨叶实际值不一致度(std)
- - 三桨叶变桨速度均值、不一致度(若 pitch_spd_1/2/3 存在)
- LOF 检测局部密度异常,适合多桨叶不同步场景。
- """
- def __init__(self, n_neighbors: int = 20, contamination: float = ISO_CONTAMINATION):
- self.n_neighbors = n_neighbors
- self.contamination = contamination
- self.scaler = StandardScaler()
- self.model = LocalOutlierFactor(
- n_neighbors=n_neighbors,
- contamination=contamination,
- novelty=True,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- feat = {}
- for i, (s, a) in enumerate(zip(PITCH_SET_COLS, PITCH_ACT_COLS), 1):
- if s in df.columns and a in df.columns:
- feat[f"err_{i}"] = df[s] - df[a]
- elif a in df.columns:
- feat[f"err_{i}"] = pd.Series(np.nan, index=df.index)
- act_cols = [c for c in PITCH_ACT_COLS if c in df.columns]
- if len(act_cols) >= 2:
- feat["act_std"] = df[act_cols].std(axis=1)
- # 变桨速度特征(可选)
- spd_cols = [c for c in PITCH_SPD_COLS if c in df.columns]
- if len(spd_cols) >= 2:
- spd_df = df[spd_cols]
- feat["spd_mean"] = spd_df.mean(axis=1)
- feat["spd_std"] = spd_df.std(axis=1)
- return pd.DataFrame(feat, index=df.index).dropna()
- def fit(self, df: pd.DataFrame) -> "PitchRegulationDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("变桨调节特征为空,检查测点是否存在")
- # 自适应 n_neighbors:避免小样本时 n_neighbors 过大导致 LOF 退化
- adaptive_k = max(5, min(self.n_neighbors, len(feat) // 50))
- if adaptive_k != self.n_neighbors:
- self.model = LocalOutlierFactor(
- n_neighbors=adaptive_k,
- contamination=self.contamination,
- novelty=True,
- )
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "PitchRegulationDetector":
- return joblib.load(path)
- # ── B. 变桨-转速-功率协调检测器 (LOF) ─────────────────────────────────────────
- class PitchCoordDetector:
- """
- 特征: pitch_ang_act_1, rotor_spd, p_active 及衍生比值。
- 优化: 若三桨叶均存在,加入三桨叶均值、不一致度(std)特征,
- 替代单桨叶 pitch_ang_act_1,捕捉三桨叶整体协调异常。
- LOF 检测三者协调关系的局部偏离。
- """
- REQUIRED = [COL_PITCH_ACT_1, COL_ROTOR_SPD, COL_P_ACTIVE]
- def __init__(self, n_neighbors: int = 20, contamination: float = ISO_CONTAMINATION):
- self.n_neighbors = n_neighbors
- self.contamination = contamination
- self.scaler = StandardScaler()
- self.model = LocalOutlierFactor(
- n_neighbors=n_neighbors,
- contamination=contamination,
- novelty=True,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- if COL_ROTOR_SPD not in df.columns or COL_P_ACTIVE not in df.columns:
- return pd.DataFrame()
- d = df[[COL_ROTOR_SPD, COL_P_ACTIVE]].copy()
- # 低转速时 p_per_spd 极度放大噪声,过滤掉
- d = d[d[COL_ROTOR_SPD] > 5.0]
- # 三桨叶一致性特征(优先使用全部三桨叶)
- act_cols = [c for c in PITCH_ACT_COLS if c in df.columns]
- if len(act_cols) >= 2:
- pitch_df = df[act_cols].loc[d.index]
- d["pitch_mean"] = pitch_df.mean(axis=1)
- d["pitch_std"] = pitch_df.std(axis=1)
- elif COL_PITCH_ACT_1 in df.columns:
- d["pitch_mean"] = df[COL_PITCH_ACT_1].loc[d.index]
- else:
- return pd.DataFrame()
- d = d.dropna()
- d["p_per_spd"] = d[COL_P_ACTIVE] / d[COL_ROTOR_SPD]
- d["pitch_x_spd"] = d["pitch_mean"] * d[COL_ROTOR_SPD]
- return d.dropna()
- def fit(self, df: pd.DataFrame) -> "PitchCoordDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("变桨协调特征为空,检查测点是否存在")
- # 自适应 n_neighbors
- adaptive_k = max(5, min(self.n_neighbors, len(feat) // 50))
- if adaptive_k != self.n_neighbors:
- self.model = LocalOutlierFactor(
- n_neighbors=adaptive_k,
- contamination=self.contamination,
- novelty=True,
- )
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "PitchCoordDetector":
- return joblib.load(path)
- # ── C. 最小桨距角检测器 (IsolationForest) ─────────────────────────────────────
- class MinPitchDetector:
- """
- 特征: 三桨叶实际值的最小值、均值、极差。
- 保留 IsolationForest:最小桨距角是全局分布异常,IF 更合适。
- """
- def __init__(self, contamination: float = ISO_CONTAMINATION):
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- act_cols = [c for c in PITCH_ACT_COLS if c in df.columns]
- if not act_cols:
- return pd.DataFrame()
- d = df[act_cols].dropna()
- return pd.DataFrame({
- "min_pitch": d.min(axis=1),
- "mean_pitch": d.mean(axis=1),
- "range_pitch": d.max(axis=1) - d.min(axis=1),
- }, index=d.index)
- def fit(self, df: pd.DataFrame) -> "MinPitchDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("最小桨距角特征为空,检查测点是否存在")
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "MinPitchDetector":
- return joblib.load(path)
|