""" 训练入口: 按机型遍历数据,训练所有异常检测模型并保存。 训练数据筛选规则(与检测逻辑对称): - '运行' 数据: 全部纳入训练 - '传感器异常' 数据: 该检测器关心的测点无传感器异常时纳入训练 例如: actual_torque 有异常,但 wind_spd/p_active 正常 → 可用于风速功率模型训练 - '停机' / '限功率': 不参与训练 用法: python train.py # 训练所有机型 python train.py --model 机型名称 # 只训练指定机型 """ import argparse from pathlib import Path import joblib import pandas as pd from config import MODEL_SAVE_DIR from data_loader import list_model_types, load_model_type from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS from models.wind_power import PowerCurveDetector, ScatterDetector from models.yaw import StaticYawDetector, CableTwistDetector from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector from models.control_params import PowerQualityDetector, OperationStateDetector def get_model_dir(model_name: str) -> Path: d = MODEL_SAVE_DIR / model_name d.mkdir(parents=True, exist_ok=True) return d def _filter_for_detector(labeled: pd.DataFrame, detector_name: str) -> pd.DataFrame: """ 返回可用于该检测器训练的数据: - status == '运行' - status 以 '传感器异常' 开头,且该检测器关心的传感器异常列全为 False """ sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, []) existing_sc = [c for c in sensor_cols if c in labeled.columns] # 运行数据 df_run = labeled[labeled["status"] == "运行"] # 传感器异常数据中,该检测器关心的测点无异常的行 df_sensor = labeled[labeled["status"].str.startswith("传感器异常")] if not df_sensor.empty and existing_sc: no_anom = ~df_sensor[existing_sc].any(axis=1) df_sensor = df_sensor[no_anom] elif not df_sensor.empty and not existing_sc: # 该检测器无关联传感器列(如偏航),传感器异常数据也全部纳入 pass else: df_sensor = pd.DataFrame() result = pd.concat([df_run, df_sensor], ignore_index=True) print(f" [训练数据] {detector_name}: 运行{len(df_run)} + 传感器异常可用{len(df_sensor)}" f" = {len(result)} 行") return result def train_wind_power(labeled: pd.DataFrame, model_dir: Path): for cls, fname, det_name, label in [ (PowerCurveDetector, "wind_power_curve.pkl", "wind_power_curve", "功率曲线"), (ScatterDetector, "wind_power_scatter.pkl", "wind_power_scatter", "散点"), ]: df = _filter_for_detector(labeled, det_name) if df.empty: print(f" [风速功率] {label}跳过(无可用数据)") continue try: cls().fit(df).save(model_dir / fname) print(f" [风速功率] {label}模型已保存") except Exception as e: print(f" [风速功率] {label}训练失败: {e}") def train_yaw(labeled: pd.DataFrame, model_dir: Path): configs = [ ("yaw_ang", "yaw_static.pkl", "yaw_static", "静态偏航", StaticYawDetector), ("twist_ang", "yaw_twist.pkl", "yaw_twist", "扭缆", CableTwistDetector), ] for req_col, fname, det_name, label, cls in configs: if req_col not in labeled.columns: print(f" [偏航] {label}跳过(缺少 {req_col} 列)") continue df = _filter_for_detector(labeled, det_name) if df.empty: print(f" [偏航] {label}跳过(无可用数据)") continue try: cls().fit(df).save(model_dir / fname) print(f" [偏航] {label}模型已保存") except Exception as e: print(f" [偏航] {label}训练失败: {e}") def train_pitch(labeled: pd.DataFrame, model_dir: Path): # A. 调节异常 & C. 最小桨距角 for cls, fname, det_name, label in [ (PitchRegulationDetector, "pitch_regulation.pkl", "pitch_regulation", "调节"), (MinPitchDetector, "pitch_min.pkl", "pitch_min", "最小桨距角"), ]: if "pitch_ang_act_1" not in labeled.columns: print(f" [变桨] {label}跳过(缺少 pitch_ang_act_1)") continue df = _filter_for_detector(labeled, det_name) if df.empty: print(f" [变桨] {label}跳过(无可用数据)") continue try: cls().fit(df).save(model_dir / fname) print(f" [变桨] {label}模型已保存") except Exception as e: print(f" [变桨] {label}训练失败: {e}") # B. 协调异常 required = ["pitch_ang_act_1", "rotor_spd", "p_active"] if not all(c in labeled.columns for c in required): print(f" [变桨] 协调跳过(缺少必要列)") return df2 = _filter_for_detector(labeled, "pitch_coord") if df2.empty: print(f" [变桨] 协调跳过(无可用数据)") return try: PitchCoordDetector().fit(df2).save(model_dir / "pitch_coord.pkl") print(f" [变桨] 协调模型已保存") except Exception as e: print(f" [变桨] 协调训练失败: {e}") def train_control_params(labeled: pd.DataFrame, model_dir: Path): # A. 功率质量检测器 df = _filter_for_detector(labeled, "ctrl_power_quality") if not df.empty: try: PowerQualityDetector().fit(df).save(model_dir / "ctrl_power_quality.pkl") print(f" [运行状态] 功率质量模型已保存") except Exception as e: print(f" [运行状态] 功率质量训练失败: {e}") else: print(f" [运行状态] 功率质量跳过(无可用数据)") # B. 运行状态综合检测器 df2 = _filter_for_detector(labeled, "ctrl_op_state") if not df2.empty: try: OperationStateDetector().fit(df2).save(model_dir / "ctrl_op_state.pkl") print(f" [运行状态] 综合运行状态模型已保存") except Exception as e: print(f" [运行状态] 综合运行状态训练失败: {e}") else: print(f" [运行状态] 综合运行状态跳过(无可用数据)") def train_one(model_name: str): print(f"\n{'='*50}") print(f"开始训练机型: {model_name}") model_dir = get_model_dir(model_name) # ── 统一加载一次全量数据(所有列超集) ── print(f" [数据] 加载全量数据...") _optional = [ "wind_spd", "gen_spd", "actual_torque", "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3", "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3", "pitch_spd_1", "pitch_spd_2", "pitch_spd_3", "rotor_spd", "yaw_ang", "twist_ang", "theory_p_active", "p_reactive", "grid_freq", "grid_ia", "grid_ib", "grid_ic", "grid_ua", "grid_ub", "grid_uc", "ambient_temp", ] df_raw = load_model_type(model_name, required_cols=["p_active"], optional_cols=_optional) if df_raw.empty or "p_active" not in df_raw.columns: print(f" [数据] 跳过(无数据或缺少 p_active)") return stats = get_model_statistics(df_raw) labeled = label_dataframe(df_raw, stats, model_name) if labeled.empty: print(f" [数据] 打标后为空,跳过") return # 保存 stats 供推理时使用,避免推理时重新加载全量数据 joblib.dump(stats, model_dir / "model_stats.pkl") print(f" [数据] model_stats.pkl 已保存") train_wind_power(labeled, model_dir) train_yaw(labeled, model_dir) train_pitch(labeled, model_dir) train_control_params(labeled, model_dir) print(f"机型 {model_name} 训练完成,模型保存至: {model_dir}") def main(): parser = argparse.ArgumentParser(description="风机异常检测模型训练") parser.add_argument("--model", type=str, default=None, help="指定机型名称,不填则训练所有机型") parser.add_argument("--list", action="store_true", help="列出所有可用机型后退出") args = parser.parse_args() if args.list: model_types = list_model_types() print(f"发现 {len(model_types)} 个机型:") for i, mt in enumerate(model_types, 1): print(f" {i}. {mt}") return if args.model: train_one(args.model) else: model_types = list_model_types() print(f"发现 {len(model_types)} 个机型: {model_types}") for mt in model_types: train_one(mt) print("\n全部训练完成。") if __name__ == "__main__": main()