""" Module 1: 风速-功率异常检测 优化点: - PowerCurveDetector 改为逐点输出:训练时保存 bin 级正常范围 (mean±σ), predict 时将每个点映射到所属 bin,计算 z-score 作为逐点异常分数 - ScatterDetector 增加环境温度特征(如果 ambient_temp 列存在), 因为空气密度影响功率输出 - 训练/预测时过滤有效发电区间 (WIND_VALID_MIN ~ WIND_VALID_MAX) - 分箱样本量权重过滤,样本不足的箱不参与训练 子检测器: A. PowerCurveDetector - 基于分箱统计的功率曲线异常(z-score 逐点) B. ScatterDetector - 基于原始散点的风速-功率异常(IsolationForest) """ import numpy as np import pandas as pd from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import joblib from pathlib import Path from config import ( COL_WIND_SPD, COL_P_ACTIVE, COL_AMBIENT_TEMP, WIND_BIN_WIDTH, WIND_BIN_MIN, WIND_BIN_MAX, ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS, WIND_VALID_MIN, WIND_VALID_MAX, ) # 分箱最小样本量,样本不足的箱不参与训练 _MIN_BIN_COUNT = 30 # 功率偏离阈值(标准差倍数),按风速区间分段 # 低风速段波动大,阈值放宽;高风速段接近额定,阈值收紧 _SIGMA_LOW = 4.0 # < 8 m/s _SIGMA_MID = 3.0 # 8~16 m/s _SIGMA_HIGH = 2.5 # > 16 m/s def _get_sigma(wind_spd: float) -> float: if wind_spd < 8.0: return _SIGMA_LOW elif wind_spd <= 16.0: return _SIGMA_MID else: return _SIGMA_HIGH def _bin_wind_speed(series: pd.Series) -> pd.Series: bins = np.arange(WIND_BIN_MIN, WIND_BIN_MAX + WIND_BIN_WIDTH, WIND_BIN_WIDTH) return pd.cut(series, bins=bins, labels=False) def _filter_valid_wind(df: pd.DataFrame) -> pd.DataFrame: """过滤有效发电风速区间,排除切入/切出段噪声。""" mask = (df[COL_WIND_SPD] >= WIND_VALID_MIN) & (df[COL_WIND_SPD] <= WIND_VALID_MAX) return df[mask] # ── A. 功率曲线检测器(逐点输出) ────────────────────────────────────────────── class PowerCurveDetector: """ 训练: 按风速分箱统计每个 bin 的 (mean, std),保存为正常范围。 预测: 将每个数据点映射到所属 bin,计算功率偏离度 z-score, 按风速区间分段 sigma 判异常(低风放宽/高风收紧)。 输出与原始数据等长。 """ def __init__(self): self.bin_stats: pd.DataFrame = pd.DataFrame() def fit(self, df: pd.DataFrame) -> "PowerCurveDetector": d = _filter_valid_wind(df[[COL_WIND_SPD, COL_P_ACTIVE]].copy()) d["wind_bin"] = _bin_wind_speed(d[COL_WIND_SPD]) stats = ( d.groupby("wind_bin")[COL_P_ACTIVE] .agg(mean_power="mean", std_power="std", count="count") .reset_index() .dropna() ) stats = stats[stats["count"] >= _MIN_BIN_COUNT] # std 为 0 时用全局 std 兜底 global_std = d[COL_P_ACTIVE].std() stats["std_power"] = stats["std_power"].replace(0, global_std) if len(stats) < 3: raise ValueError("功率曲线有效分箱不足") self.bin_stats = stats return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: # 保留原始索引,最终结果与输入等长 out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) valid_mask = ( df[COL_WIND_SPD].notna() & df[COL_P_ACTIVE].notna() & (df[COL_WIND_SPD] >= WIND_VALID_MIN) & (df[COL_WIND_SPD] <= WIND_VALID_MAX) ) d = df.loc[valid_mask, [COL_WIND_SPD, COL_P_ACTIVE]].copy() if d.empty: return out d["wind_bin"] = _bin_wind_speed(d[COL_WIND_SPD]) bin_map = self.bin_stats.set_index("wind_bin")[["mean_power", "std_power"]] d["mean_power"] = d["wind_bin"].map(bin_map["mean_power"]) d["std_power"] = d["wind_bin"].map(bin_map["std_power"]) has_stat = d["mean_power"].notna() & d["std_power"].notna() d_stat = d.loc[has_stat].copy() z = (d_stat[COL_P_ACTIVE] - d_stat["mean_power"]) / d_stat["std_power"] sigma_arr = d_stat[COL_WIND_SPD].map(_get_sigma) out.loc[d_stat.index, "score"] = z.values out.loc[d_stat.index, "anomaly"] = (z.abs() > sigma_arr).values return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "PowerCurveDetector": return joblib.load(path) # ── B. 散点检测器 ───────────────────────────────────────────────────────────── class ScatterDetector: """ 对有效发电区间内的 (wind_spd, p_active) 点对做异常检测。 增加 p_active / wind_spd^3 特征(近似 Cp),捕捉风能利用率偏离。 可选: 如果数据中包含 ambient_temp 列,加入温度特征(空气密度影响功率)。 """ def __init__(self, contamination=ISO_CONTAMINATION): self.contamination = contamination self.scaler = StandardScaler() self.model = IsolationForest( n_estimators=ISO_N_ESTIMATORS, contamination=contamination, random_state=ISO_RANDOM_STATE, ) self._has_temp = False def _features(self, df: pd.DataFrame) -> pd.DataFrame: cols = [COL_WIND_SPD, COL_P_ACTIVE] has_temp = COL_AMBIENT_TEMP in df.columns if has_temp: cols.append(COL_AMBIENT_TEMP) d = _filter_valid_wind(df[cols].copy()).dropna() wind3 = d[COL_WIND_SPD] ** 3 d["cp_proxy"] = d[COL_P_ACTIVE] / wind3.replace(0, np.nan) if has_temp: # 温度越低空气密度越大,同风速下功率应更高 d["temp"] = d[COL_AMBIENT_TEMP] return d.dropna() def fit(self, df: pd.DataFrame) -> "ScatterDetector": feat = self._features(df) if feat.empty: raise ValueError("散点特征为空,检查风速功率数据") self._has_temp = COL_AMBIENT_TEMP in feat.columns X = self.scaler.fit_transform(feat.select_dtypes(include=[np.number])) self.model.fit(X) return self def predict(self, df: pd.DataFrame) -> pd.DataFrame: out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index) feat = self._features(df) if feat.empty: return out X = self.scaler.transform(feat.select_dtypes(include=[np.number])) out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1 out.loc[feat.index, "score"] = self.model.score_samples(X) return out def save(self, path: Path): joblib.dump(self, path) @classmethod def load(cls, path: Path) -> "ScatterDetector": return joblib.load(path)