""" Module 4: 运行参数异常检测 算法: IsolationForest - 相比 OneClassSVM 对高维稀疏特征更稳定,训练速度更快 - 两个检测器特征维度较高(电气侧多测点),IF 更合适 子检测器: A. PowerQualityDetector - 功率质量异常 测点: p_active, theory_p_active, p_reactive, grid_freq, grid_ia/ib/ic, grid_ua/ub/uc 检测: 三相不平衡、功率因数偏低、频率偏差、理论/实际功率偏差 B. OperationStateDetector - 运行状态异常 测点: p_active, gen_spd, pitch_ang_act_1/2/3, twist_ang 检测: 转速-功率-桨距角-扭缆整体运行状态偏离正常模式 """ import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import joblib from pathlib import Path from config import ( ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS, COL_P_ACTIVE, COL_ROTOR_SPD, COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3, COL_TWIST_ANG, ) _GRID_CURR = ["grid_ia", "grid_ib", "grid_ic"] _GRID_VOLT = ["grid_ua", "grid_ub", "grid_uc"] # ── A. 功率质量检测器 ────────────────────────────────────────────────────────── class PowerQualityDetector: """ 特征工程: - 理论/实际功率偏差比 (p_diff_ratio) - 功率因数近似 (p_active / sqrt(p_active^2 + p_reactive^2)) - 三相电流不平衡度 (std/mean) - 三相电压不平衡度 (std/mean) - 电网频率偏差 (grid_freq - 50) 所有测点均为可选,存在则纳入特征,缺失则跳过。 至少需要 p_active + 任意一个辅助测点。 """ def __init__(self, contamination: float = ISO_CONTAMINATION): self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: feat = {} # 理论/实际功率偏差比 if "theory_p_active" in df.columns and COL_P_ACTIVE in df.columns: denom = df["theory_p_active"].replace(0, np.nan) feat["p_diff_ratio"] = (df[COL_P_ACTIVE] - df["theory_p_active"]) / denom # 功率因数近似(需要有功+无功) if COL_P_ACTIVE in df.columns and "p_reactive" in df.columns: apparent = np.sqrt(df[COL_P_ACTIVE] ** 2 + df["p_reactive"] ** 2) feat["power_factor"] = df[COL_P_ACTIVE] / apparent.replace(0, np.nan) # 三相电流不平衡度 curr_cols = [c for c in _GRID_CURR if c in df.columns] if len(curr_cols) >= 2: curr = df[curr_cols] mean_c = curr.mean(axis=1).replace(0, np.nan) feat["curr_imbalance"] = curr.std(axis=1) / mean_c # 三相电压不平衡度 volt_cols = [c for c in _GRID_VOLT if c in df.columns] if len(volt_cols) >= 2: volt = df[volt_cols] mean_v = volt.mean(axis=1).replace(0, np.nan) feat["volt_imbalance"] = volt.std(axis=1) / mean_v # 频率偏差 if "grid_freq" in df.columns: feat["freq_dev"] = df["grid_freq"] - 50.0 if not feat: return pd.DataFrame() return pd.DataFrame(feat, index=df.index).replace([np.inf, -np.inf], np.nan).dropna() def fit(self, df: pd.DataFrame) -> "PowerQualityDetector": feat = self._features(df) if feat.empty or len(feat.columns) < 2: raise ValueError("功率质量特征不足(至少需要 p_active + 一个辅助测点)") X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "PowerQualityDetector": return joblib.load(path) # ── B. 运行状态综合检测器 ────────────────────────────────────────────────────── class OperationStateDetector: """ 特征工程: - p_active(有功功率) - gen_spd(发电机转速) - 三桨叶实际桨距角均值、不一致度(std) - twist_ang(扭缆角度) - 功率/转速比(反映转矩状态) - 桨距角均值 × 转速(协调特征) 所有测点均为可选,p_active 为必须项。 """ def __init__(self, contamination: float = ISO_CONTAMINATION): self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) def _features(self, df: pd.DataFrame) -> pd.DataFrame: if COL_P_ACTIVE not in df.columns: return pd.DataFrame() feat = {COL_P_ACTIVE: df[COL_P_ACTIVE]} if COL_ROTOR_SPD in df.columns: feat[COL_ROTOR_SPD] = df[COL_ROTOR_SPD] spd_safe = df[COL_ROTOR_SPD].replace(0, np.nan) feat["p_per_spd"] = df[COL_P_ACTIVE] / spd_safe # 三桨叶特征 act_cols = [c for c in [COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3] if c in df.columns] if act_cols: pitch_df = df[act_cols] feat["pitch_mean"] = pitch_df.mean(axis=1) if len(act_cols) >= 2: feat["pitch_std"] = pitch_df.std(axis=1) if COL_ROTOR_SPD in df.columns: feat["pitch_x_spd"] = feat["pitch_mean"] * df[COL_ROTOR_SPD] if COL_TWIST_ANG in df.columns: feat[COL_TWIST_ANG] = df[COL_TWIST_ANG] result = pd.DataFrame(feat, index=df.index) return result.replace([np.inf, -np.inf], np.nan).dropna() def fit(self, df: pd.DataFrame) -> "OperationStateDetector": feat = self._features(df) if feat.empty or len(feat.columns) < 2: raise ValueError("运行状态特征不足(至少需要 p_active + 一个辅助测点)") X = self.scaler.fit_transform(feat) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "OperationStateDetector": return joblib.load(path)