| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- """
- 训练入口: 按机型遍历数据,训练所有异常检测模型并保存。
- 训练数据筛选规则(与检测逻辑对称):
- - '运行' 数据: 全部纳入训练
- - '传感器异常' 数据: 该检测器关心的测点无传感器异常时纳入训练
- 例如: actual_torque 有异常,但 wind_spd/p_active 正常 → 可用于风速功率模型训练
- - '停机' / '限功率': 不参与训练
- 用法:
- python train.py # 训练所有机型
- python train.py --model 机型名称 # 只训练指定机型
- """
- import argparse
- from pathlib import Path
- import joblib
- import pandas as pd
- from config import MODEL_SAVE_DIR
- from data_loader import list_model_types, load_model_type
- from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS
- from models.wind_power import PowerCurveDetector, ScatterDetector
- from models.yaw import StaticYawDetector, CableTwistDetector
- from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector
- from models.control_params import PowerQualityDetector, OperationStateDetector
- def get_model_dir(model_name: str) -> Path:
- d = MODEL_SAVE_DIR / model_name
- d.mkdir(parents=True, exist_ok=True)
- return d
- def _filter_for_detector(labeled: pd.DataFrame, detector_name: str) -> pd.DataFrame:
- """
- 返回可用于该检测器训练的数据:
- - status == '运行'
- - status 以 '传感器异常' 开头,且该检测器关心的传感器异常列全为 False
- """
- sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, [])
- existing_sc = [c for c in sensor_cols if c in labeled.columns]
- # 运行数据
- df_run = labeled[labeled["status"] == "运行"]
- # 传感器异常数据中,该检测器关心的测点无异常的行
- df_sensor = labeled[labeled["status"].str.startswith("传感器异常")]
- if not df_sensor.empty and existing_sc:
- no_anom = ~df_sensor[existing_sc].any(axis=1)
- df_sensor = df_sensor[no_anom]
- elif not df_sensor.empty and not existing_sc:
- # 该检测器无关联传感器列(如偏航),传感器异常数据也全部纳入
- pass
- else:
- df_sensor = pd.DataFrame()
- result = pd.concat([df_run, df_sensor], ignore_index=True)
- print(f" [训练数据] {detector_name}: 运行{len(df_run)} + 传感器异常可用{len(df_sensor)}"
- f" = {len(result)} 行")
- return result
- def train_wind_power(labeled: pd.DataFrame, model_dir: Path):
- for cls, fname, det_name, label in [
- (PowerCurveDetector, "wind_power_curve.pkl", "wind_power_curve", "功率曲线"),
- (ScatterDetector, "wind_power_scatter.pkl", "wind_power_scatter", "散点"),
- ]:
- df = _filter_for_detector(labeled, det_name)
- if df.empty:
- print(f" [风速功率] {label}跳过(无可用数据)")
- continue
- try:
- cls().fit(df).save(model_dir / fname)
- print(f" [风速功率] {label}模型已保存")
- except Exception as e:
- print(f" [风速功率] {label}训练失败: {e}")
- def train_yaw(labeled: pd.DataFrame, model_dir: Path):
- configs = [
- ("yaw_ang", "yaw_static.pkl", "yaw_static", "静态偏航", StaticYawDetector),
- ("twist_ang", "yaw_twist.pkl", "yaw_twist", "扭缆", CableTwistDetector),
- ]
- for req_col, fname, det_name, label, cls in configs:
- if req_col not in labeled.columns:
- print(f" [偏航] {label}跳过(缺少 {req_col} 列)")
- continue
- df = _filter_for_detector(labeled, det_name)
- if df.empty:
- print(f" [偏航] {label}跳过(无可用数据)")
- continue
- try:
- cls().fit(df).save(model_dir / fname)
- print(f" [偏航] {label}模型已保存")
- except Exception as e:
- print(f" [偏航] {label}训练失败: {e}")
- def train_pitch(labeled: pd.DataFrame, model_dir: Path):
- # A. 调节异常 & C. 最小桨距角
- for cls, fname, det_name, label in [
- (PitchRegulationDetector, "pitch_regulation.pkl", "pitch_regulation", "调节"),
- (MinPitchDetector, "pitch_min.pkl", "pitch_min", "最小桨距角"),
- ]:
- if "pitch_ang_act_1" not in labeled.columns:
- print(f" [变桨] {label}跳过(缺少 pitch_ang_act_1)")
- continue
- df = _filter_for_detector(labeled, det_name)
- if df.empty:
- print(f" [变桨] {label}跳过(无可用数据)")
- continue
- try:
- cls().fit(df).save(model_dir / fname)
- print(f" [变桨] {label}模型已保存")
- except Exception as e:
- print(f" [变桨] {label}训练失败: {e}")
- # B. 协调异常
- required = ["pitch_ang_act_1", "rotor_spd", "p_active"]
- if not all(c in labeled.columns for c in required):
- print(f" [变桨] 协调跳过(缺少必要列)")
- return
- df2 = _filter_for_detector(labeled, "pitch_coord")
- if df2.empty:
- print(f" [变桨] 协调跳过(无可用数据)")
- return
- try:
- PitchCoordDetector().fit(df2).save(model_dir / "pitch_coord.pkl")
- print(f" [变桨] 协调模型已保存")
- except Exception as e:
- print(f" [变桨] 协调训练失败: {e}")
- def train_control_params(labeled: pd.DataFrame, model_dir: Path):
- # A. 功率质量检测器
- df = _filter_for_detector(labeled, "ctrl_power_quality")
- if not df.empty:
- try:
- PowerQualityDetector().fit(df).save(model_dir / "ctrl_power_quality.pkl")
- print(f" [运行状态] 功率质量模型已保存")
- except Exception as e:
- print(f" [运行状态] 功率质量训练失败: {e}")
- else:
- print(f" [运行状态] 功率质量跳过(无可用数据)")
- # B. 运行状态综合检测器
- df2 = _filter_for_detector(labeled, "ctrl_op_state")
- if not df2.empty:
- try:
- OperationStateDetector().fit(df2).save(model_dir / "ctrl_op_state.pkl")
- print(f" [运行状态] 综合运行状态模型已保存")
- except Exception as e:
- print(f" [运行状态] 综合运行状态训练失败: {e}")
- else:
- print(f" [运行状态] 综合运行状态跳过(无可用数据)")
- def train_one(model_name: str):
- print(f"\n{'='*50}")
- print(f"开始训练机型: {model_name}")
- model_dir = get_model_dir(model_name)
- # ── 统一加载一次全量数据(所有列超集) ──
- print(f" [数据] 加载全量数据...")
- _optional = [
- "wind_spd", "gen_spd", "actual_torque",
- "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3",
- "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3",
- "pitch_spd_1", "pitch_spd_2", "pitch_spd_3",
- "rotor_spd", "yaw_ang", "twist_ang",
- "theory_p_active", "p_reactive", "grid_freq",
- "grid_ia", "grid_ib", "grid_ic",
- "grid_ua", "grid_ub", "grid_uc",
- "ambient_temp",
- ]
- df_raw = load_model_type(model_name, required_cols=["p_active"], optional_cols=_optional)
- if df_raw.empty or "p_active" not in df_raw.columns:
- print(f" [数据] 跳过(无数据或缺少 p_active)")
- return
- stats = get_model_statistics(df_raw)
- labeled = label_dataframe(df_raw, stats, model_name)
- if labeled.empty:
- print(f" [数据] 打标后为空,跳过")
- return
- # 保存 stats 供推理时使用,避免推理时重新加载全量数据
- joblib.dump(stats, model_dir / "model_stats.pkl")
- print(f" [数据] model_stats.pkl 已保存")
- train_wind_power(labeled, model_dir)
- train_yaw(labeled, model_dir)
- train_pitch(labeled, model_dir)
- train_control_params(labeled, model_dir)
- print(f"机型 {model_name} 训练完成,模型保存至: {model_dir}")
- def main():
- parser = argparse.ArgumentParser(description="风机异常检测模型训练")
- parser.add_argument("--model", type=str, default=None, help="指定机型名称,不填则训练所有机型")
- parser.add_argument("--list", action="store_true", help="列出所有可用机型后退出")
- args = parser.parse_args()
- if args.list:
- model_types = list_model_types()
- print(f"发现 {len(model_types)} 个机型:")
- for i, mt in enumerate(model_types, 1):
- print(f" {i}. {mt}")
- return
- if args.model:
- train_one(args.model)
- else:
- model_types = list_model_types()
- print(f"发现 {len(model_types)} 个机型: {model_types}")
- for mt in model_types:
- train_one(mt)
- print("\n全部训练完成。")
- if __name__ == "__main__":
- main()
|