| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- """
- Module 1: 风速-功率异常检测
- 优化点:
- - PowerCurveDetector 改为逐点输出:训练时保存 bin 级正常范围 (mean±σ),
- predict 时将每个点映射到所属 bin,计算 z-score 作为逐点异常分数
- - ScatterDetector 增加环境温度特征(如果 ambient_temp 列存在),
- 因为空气密度影响功率输出
- - 训练/预测时过滤有效发电区间 (WIND_VALID_MIN ~ WIND_VALID_MAX)
- - 分箱样本量权重过滤,样本不足的箱不参与训练
- 子检测器:
- A. PowerCurveDetector - 基于分箱统计的功率曲线异常(z-score 逐点)
- B. ScatterDetector - 基于原始散点的风速-功率异常(IsolationForest)
- """
- import numpy as np
- import pandas as pd
- from sklearn.ensemble import IsolationForest
- from sklearn.preprocessing import StandardScaler
- import joblib
- from pathlib import Path
- from config import (
- COL_WIND_SPD, COL_P_ACTIVE, COL_AMBIENT_TEMP,
- WIND_BIN_WIDTH, WIND_BIN_MIN, WIND_BIN_MAX,
- ISO_CONTAMINATION, ISO_RANDOM_STATE, ISO_N_ESTIMATORS,
- WIND_VALID_MIN, WIND_VALID_MAX,
- )
- # 分箱最小样本量,样本不足的箱不参与训练
- _MIN_BIN_COUNT = 30
- # 功率偏离阈值(标准差倍数),按风速区间分段
- # 低风速段波动大,阈值放宽;高风速段接近额定,阈值收紧
- _SIGMA_LOW = 4.0 # < 8 m/s
- _SIGMA_MID = 3.0 # 8~16 m/s
- _SIGMA_HIGH = 2.5 # > 16 m/s
- def _get_sigma(wind_spd: float) -> float:
- if wind_spd < 8.0:
- return _SIGMA_LOW
- elif wind_spd <= 16.0:
- return _SIGMA_MID
- else:
- return _SIGMA_HIGH
- def _bin_wind_speed(series: pd.Series) -> pd.Series:
- bins = np.arange(WIND_BIN_MIN, WIND_BIN_MAX + WIND_BIN_WIDTH, WIND_BIN_WIDTH)
- return pd.cut(series, bins=bins, labels=False)
- def _filter_valid_wind(df: pd.DataFrame) -> pd.DataFrame:
- """过滤有效发电风速区间,排除切入/切出段噪声。"""
- mask = (df[COL_WIND_SPD] >= WIND_VALID_MIN) & (df[COL_WIND_SPD] <= WIND_VALID_MAX)
- return df[mask]
- # ── A. 功率曲线检测器(逐点输出) ──────────────────────────────────────────────
- class PowerCurveDetector:
- """
- 训练: 按风速分箱统计每个 bin 的 (mean, std),保存为正常范围。
- 预测: 将每个数据点映射到所属 bin,计算功率偏离度 z-score,
- 按风速区间分段 sigma 判异常(低风放宽/高风收紧)。
- 输出与原始数据等长。
- """
- def __init__(self):
- self.bin_stats: pd.DataFrame = pd.DataFrame()
- def fit(self, df: pd.DataFrame) -> "PowerCurveDetector":
- d = _filter_valid_wind(df[[COL_WIND_SPD, COL_P_ACTIVE]].copy())
- d["wind_bin"] = _bin_wind_speed(d[COL_WIND_SPD])
- stats = (
- d.groupby("wind_bin")[COL_P_ACTIVE]
- .agg(mean_power="mean", std_power="std", count="count")
- .reset_index()
- .dropna()
- )
- stats = stats[stats["count"] >= _MIN_BIN_COUNT]
- # std 为 0 时用全局 std 兜底
- global_std = d[COL_P_ACTIVE].std()
- stats["std_power"] = stats["std_power"].replace(0, global_std)
- if len(stats) < 3:
- raise ValueError("功率曲线有效分箱不足")
- self.bin_stats = stats
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- # 保留原始索引,最终结果与输入等长
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- valid_mask = (
- df[COL_WIND_SPD].notna() & df[COL_P_ACTIVE].notna() &
- (df[COL_WIND_SPD] >= WIND_VALID_MIN) & (df[COL_WIND_SPD] <= WIND_VALID_MAX)
- )
- d = df.loc[valid_mask, [COL_WIND_SPD, COL_P_ACTIVE]].copy()
- if d.empty:
- return out
- d["wind_bin"] = _bin_wind_speed(d[COL_WIND_SPD])
- bin_map = self.bin_stats.set_index("wind_bin")[["mean_power", "std_power"]]
- d["mean_power"] = d["wind_bin"].map(bin_map["mean_power"])
- d["std_power"] = d["wind_bin"].map(bin_map["std_power"])
- has_stat = d["mean_power"].notna() & d["std_power"].notna()
- d_stat = d.loc[has_stat].copy()
- z = (d_stat[COL_P_ACTIVE] - d_stat["mean_power"]) / d_stat["std_power"]
- sigma_arr = d_stat[COL_WIND_SPD].map(_get_sigma)
- out.loc[d_stat.index, "score"] = z.values
- out.loc[d_stat.index, "anomaly"] = (z.abs() > sigma_arr).values
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "PowerCurveDetector":
- return joblib.load(path)
- # ── B. 散点检测器 ─────────────────────────────────────────────────────────────
- class ScatterDetector:
- """
- 对有效发电区间内的 (wind_spd, p_active) 点对做异常检测。
- 增加 p_active / wind_spd^3 特征(近似 Cp),捕捉风能利用率偏离。
- 可选: 如果数据中包含 ambient_temp 列,加入温度特征(空气密度影响功率)。
- """
- def __init__(self, contamination=ISO_CONTAMINATION):
- self.contamination = contamination
- self.scaler = StandardScaler()
- self.model = IsolationForest(
- n_estimators=ISO_N_ESTIMATORS,
- contamination=contamination,
- random_state=ISO_RANDOM_STATE,
- )
- self._has_temp = False
- def _features(self, df: pd.DataFrame) -> pd.DataFrame:
- cols = [COL_WIND_SPD, COL_P_ACTIVE]
- has_temp = COL_AMBIENT_TEMP in df.columns
- if has_temp:
- cols.append(COL_AMBIENT_TEMP)
- d = _filter_valid_wind(df[cols].copy()).dropna()
- wind3 = d[COL_WIND_SPD] ** 3
- d["cp_proxy"] = d[COL_P_ACTIVE] / wind3.replace(0, np.nan)
- if has_temp:
- # 温度越低空气密度越大,同风速下功率应更高
- d["temp"] = d[COL_AMBIENT_TEMP]
- return d.dropna()
- def fit(self, df: pd.DataFrame) -> "ScatterDetector":
- feat = self._features(df)
- if feat.empty:
- raise ValueError("散点特征为空,检查风速功率数据")
- self._has_temp = COL_AMBIENT_TEMP in feat.columns
- X = self.scaler.fit_transform(feat.select_dtypes(include=[np.number]))
- self.model.fit(X)
- return self
- def predict(self, df: pd.DataFrame) -> pd.DataFrame:
- out = pd.DataFrame({"anomaly": False, "score": np.nan}, index=df.index)
- feat = self._features(df)
- if feat.empty:
- return out
- X = self.scaler.transform(feat.select_dtypes(include=[np.number]))
- out.loc[feat.index, "anomaly"] = self.model.predict(X) == -1
- out.loc[feat.index, "score"] = self.model.score_samples(X)
- return out
- def save(self, path: Path):
- joblib.dump(self, path)
- @classmethod
- def load(cls, path: Path) -> "ScatterDetector":
- return joblib.load(path)
|