| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- """
- Module 4: 运行参数异常检测
- 算法: IsolationForest
- - 相比 OneClassSVM 对高维稀疏特征更稳定,训练速度更快
- - 两个检测器特征维度较高(电气侧多测点),IF 更合适
- 子检测器:
- A. PowerQualityDetector - 功率质量异常
- 测点: p_active, theory_p_active, p_reactive, grid_freq,
- grid_ia/ib/ic, grid_ua/ub/uc
- 检测: 三相不平衡、功率因数偏低、频率偏差、理论/实际功率偏差
- B. OperationStateDetector - 运行状态异常
- 测点: p_active, gen_spd, pitch_ang_act_1/2/3, twist_ang
- 检测: 转速-功率-桨距角-扭缆整体运行状态偏离正常模式
- """
- import pandas as pd
- import numpy as np
- from sklearn.ensemble import IsolationForest
- from sklearn.preprocessing import StandardScaler
- import joblib
- from pathlib import Path
- from config import (
- ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS,
- COL_P_ACTIVE, COL_ROTOR_SPD,
- COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3,
- COL_TWIST_ANG,
- )
- _GRID_CURR = ["grid_ia", "grid_ib", "grid_ic"]
- _GRID_VOLT = ["grid_ua", "grid_ub", "grid_uc"]
- # ── A. 功率质量检测器 ──────────────────────────────────────────────────────────
- class PowerQualityDetector:
- """
- 特征工程:
- - 理论/实际功率偏差比 (p_diff_ratio)
- - 功率因数近似 (p_active / sqrt(p_active^2 + p_reactive^2))
- - 三相电流不平衡度 (std/mean)
- - 三相电压不平衡度 (std/mean)
- - 电网频率偏差 (grid_freq - 50)
- 所有测点均为可选,存在则纳入特征,缺失则跳过。
- 至少需要 p_active + 任意一个辅助测点。
- """
- def __init__(self, contamination: float = ISO_CONTAMINATION):
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- feat = {}
- # 理论/实际功率偏差比
- if "theory_p_active" in df.columns and COL_P_ACTIVE in df.columns:
- denom = df["theory_p_active"].replace(0, np.nan)
- feat["p_diff_ratio"] = (df[COL_P_ACTIVE] - df["theory_p_active"]) / denom
- # 功率因数近似(需要有功+无功)
- if COL_P_ACTIVE in df.columns and "p_reactive" in df.columns:
- apparent = np.sqrt(df[COL_P_ACTIVE] ** 2 + df["p_reactive"] ** 2)
- feat["power_factor"] = df[COL_P_ACTIVE] / apparent.replace(0, np.nan)
- # 三相电流不平衡度
- curr_cols = [c for c in _GRID_CURR if c in df.columns]
- if len(curr_cols) >= 2:
- curr = df[curr_cols]
- mean_c = curr.mean(axis=1).replace(0, np.nan)
- feat["curr_imbalance"] = curr.std(axis=1) / mean_c
- # 三相电压不平衡度
- volt_cols = [c for c in _GRID_VOLT if c in df.columns]
- if len(volt_cols) >= 2:
- volt = df[volt_cols]
- mean_v = volt.mean(axis=1).replace(0, np.nan)
- feat["volt_imbalance"] = volt.std(axis=1) / mean_v
- # 频率偏差
- if "grid_freq" in df.columns:
- feat["freq_dev"] = df["grid_freq"] - 50.0
- if not feat:
- return pd.DataFrame()
- return pd.DataFrame(feat, index=df.index).replace([np.inf, -np.inf], np.nan).dropna()
- def fit(self, df: pd.DataFrame) -> "PowerQualityDetector":
- feat = self._features(df)
- if feat.empty or len(feat.columns) < 2:
- raise ValueError("功率质量特征不足(至少需要 p_active + 一个辅助测点)")
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "PowerQualityDetector":
- return joblib.load(path)
- # ── B. 运行状态综合检测器 ──────────────────────────────────────────────────────
- class OperationStateDetector:
- """
- 特征工程:
- - p_active(有功功率)
- - gen_spd(发电机转速)
- - 三桨叶实际桨距角均值、不一致度(std)
- - twist_ang(扭缆角度)
- - 功率/转速比(反映转矩状态)
- - 桨距角均值 × 转速(协调特征)
- 所有测点均为可选,p_active 为必须项。
- """
- def __init__(self, contamination: float = ISO_CONTAMINATION):
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- if COL_P_ACTIVE not in df.columns:
- return pd.DataFrame()
- feat = {COL_P_ACTIVE: df[COL_P_ACTIVE]}
- if COL_ROTOR_SPD in df.columns:
- feat[COL_ROTOR_SPD] = df[COL_ROTOR_SPD]
- spd_safe = df[COL_ROTOR_SPD].replace(0, np.nan)
- feat["p_per_spd"] = df[COL_P_ACTIVE] / spd_safe
- # 三桨叶特征
- act_cols = [c for c in [COL_PITCH_ACT_1, COL_PITCH_ACT_2, COL_PITCH_ACT_3]
- if c in df.columns]
- if act_cols:
- pitch_df = df[act_cols]
- feat["pitch_mean"] = pitch_df.mean(axis=1)
- if len(act_cols) >= 2:
- feat["pitch_std"] = pitch_df.std(axis=1)
- if COL_ROTOR_SPD in df.columns:
- feat["pitch_x_spd"] = feat["pitch_mean"] * df[COL_ROTOR_SPD]
- if COL_TWIST_ANG in df.columns:
- feat[COL_TWIST_ANG] = df[COL_TWIST_ANG]
- result = pd.DataFrame(feat, index=df.index)
- return result.replace([np.inf, -np.inf], np.nan).dropna()
- def fit(self, df: pd.DataFrame) -> "OperationStateDetector":
- feat = self._features(df)
- if feat.empty or len(feat.columns) < 2:
- raise ValueError("运行状态特征不足(至少需要 p_active + 一个辅助测点)")
- X = self.scaler.fit_transform(feat)
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat)
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "OperationStateDetector":
- return joblib.load(path)
|