""" Module 3: 变桨系统异常检测 算法: LocalOutlierFactor (LOF) - 基于局部密度,适合检测多桨叶不一致的局部异常 - novelty=True 支持 fit/predict 分离(训练集拟合,推理时预测新数据) - n_neighbors 自适应:min(20, len(train) // 50),避免小样本退化 子检测器: A. PitchRegulationDetector - 桨距角调节异常(设定值 vs 实际值,3个桨叶) B. PitchCoordDetector - 变桨-转速-功率协调异常 C. MinPitchDetector - 最小桨距角异常(保留 IsolationForest,分布异常更合适) """ import pandas as pd import numpy as np from sklearn.neighbors import LocalOutlierFactor from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import joblib from pathlib import Path from config import ( COL_PITCH_SET_1, COL_PITCH_SET_2, COL_PITCH_SET_3, COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3, COL_PITCH_SPD_1, COL_PITCH_SPD_2, COL_PITCH_SPD_3, COL_ROTOR_SPD, COL_P_ACTIVE, ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS, ) PITCH_SET_COLS = [COL_PITCH_SET_1, COL_PITCH_SET_2, COL_PITCH_SET_3] PITCH_ACT_COLS = [COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3] PITCH_SPD_COLS = [COL_PITCH_SPD_1, COL_PITCH_SPD_2, COL_PITCH_SPD_3] # ── A. 桨距角调节检测器 (LOF) ────────────────────────────────────────────────── class PitchRegulationDetector: """ 特征: - 每个桨叶的 (设定值-实际值) 偏差 - 三桨叶实际值不一致度(std) - 三桨叶变桨速度均值、不一致度(若 pitch_spd_1/2/3 存在) LOF 检测局部密度异常,适合多桨叶不同步场景。 """ def __init__(self, n_neighbors: int = 20, contamination: float = ISO_CONTAMINATION): self.n_neighbors = n_neighbors self.contamination = contamination self.scaler = StandardScaler() self.model = LocalOutlierFactor( n_neighbors=n_neighbors, contamination=contamination, novelty=True, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: feat = {} for i, (s, a) in enumerate(zip(PITCH_SET_COLS, PITCH_ACT_COLS), 1): if s in df.columns and a in df.columns: feat[f"err_{i}"] = df[s] - df[a] elif a in df.columns: feat[f"err_{i}"] = pd.Series(np.nan, index=df.index) act_cols = [c for c in PITCH_ACT_COLS if c in df.columns] if len(act_cols) >= 2: feat["act_std"] = df[act_cols].std(axis=1) # 变桨速度特征(可选) spd_cols = [c for c in PITCH_SPD_COLS if c in df.columns] if len(spd_cols) >= 2: spd_df = df[spd_cols] feat["spd_mean"] = spd_df.mean(axis=1) feat["spd_std"] = spd_df.std(axis=1) return pd.DataFrame(feat, index=df.index).dropna() def fit(self, df: pd.DataFrame) -> "PitchRegulationDetector": feat = self._features(df) if feat.empty: raise ValueError("变桨调节特征为空,检查测点是否存在") # 自适应 n_neighbors:避免小样本时 n_neighbors 过大导致 LOF 退化 adaptive_k = max(5, min(self.n_neighbors, len(feat) // 50)) if adaptive_k != self.n_neighbors: self.model = LocalOutlierFactor( n_neighbors=adaptive_k, contamination=self.contamination, novelty=True, ) X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "PitchRegulationDetector": return joblib.load(path) # ── B. 变桨-转速-功率协调检测器 (LOF) ───────────────────────────────────────── class PitchCoordDetector: """ 特征: pitch_ang_act_1, rotor_spd, p_active 及衍生比值。 优化: 若三桨叶均存在,加入三桨叶均值、不一致度(std)特征, 替代单桨叶 pitch_ang_act_1,捕捉三桨叶整体协调异常。 LOF 检测三者协调关系的局部偏离。 """ REQUIRED = [COL_PITCH_ACT_1, COL_ROTOR_SPD, COL_P_ACTIVE] def __init__(self, n_neighbors: int = 20, contamination: float = ISO_CONTAMINATION): self.n_neighbors = n_neighbors self.contamination = contamination self.scaler = StandardScaler() self.model = LocalOutlierFactor( n_neighbors=n_neighbors, contamination=contamination, novelty=True, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: if COL_ROTOR_SPD not in df.columns or COL_P_ACTIVE not in df.columns: return pd.DataFrame() d = df[[COL_ROTOR_SPD, COL_P_ACTIVE]].copy() # 低转速时 p_per_spd 极度放大噪声,过滤掉 d = d[d[COL_ROTOR_SPD] > 5.0] # 三桨叶一致性特征(优先使用全部三桨叶) act_cols = [c for c in PITCH_ACT_COLS if c in df.columns] if len(act_cols) >= 2: pitch_df = df[act_cols].loc[d.index] d["pitch_mean"] = pitch_df.mean(axis=1) d["pitch_std"] = pitch_df.std(axis=1) elif COL_PITCH_ACT_1 in df.columns: d["pitch_mean"] = df[COL_PITCH_ACT_1].loc[d.index] else: return pd.DataFrame() d = d.dropna() d["p_per_spd"] = d[COL_P_ACTIVE] / d[COL_ROTOR_SPD] d["pitch_x_spd"] = d["pitch_mean"] * d[COL_ROTOR_SPD] return d.dropna() def fit(self, df: pd.DataFrame) -> "PitchCoordDetector": feat = self._features(df) if feat.empty: raise ValueError("变桨协调特征为空,检查测点是否存在") # 自适应 n_neighbors adaptive_k = max(5, min(self.n_neighbors, len(feat) // 50)) if adaptive_k != self.n_neighbors: self.model = LocalOutlierFactor( n_neighbors=adaptive_k, contamination=self.contamination, novelty=True, ) X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "PitchCoordDetector": return joblib.load(path) # ── C. 最小桨距角检测器 (IsolationForest) ───────────────────────────────────── class MinPitchDetector: """ 特征: 三桨叶实际值的最小值、均值、极差。 保留 IsolationForest:最小桨距角是全局分布异常,IF 更合适。 """ def __init__(self, contamination: float = ISO_CONTAMINATION): self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: act_cols = [c for c in PITCH_ACT_COLS if c in df.columns] if not act_cols: return pd.DataFrame() d = df[act_cols].dropna() return pd.DataFrame({ "min_pitch": d.min(axis=1), "mean_pitch": d.mean(axis=1), "range_pitch": d.max(axis=1) - d.min(axis=1), }, index=d.index) def fit(self, df: pd.DataFrame) -> "MinPitchDetector": feat = self._features(df) if feat.empty: raise ValueError("最小桨距角特征为空,检查测点是否存在") X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "MinPitchDetector": return joblib.load(path)