""" 推理入口: 读取前1天 parquet 数据,逐点输出异常检测结果。 数据读取: - 每天更新一个 parquet,路径: {PARQUET_ROOT}/{model_name}/{farm_name}/{turbine_name}.parquet - 读取时过滤 data_time 在 [昨日 00:00, 今日 00:00) 范围内的数据 检测逻辑: 1. 打标(运行/限功率/停机/传感器异常) 2. 停机 / 限功率 → 跳过,不输出 3. 运行数据 → 模型预测,逐点输出 is_anomaly + score 4. 传感器异常数据: 该检测器关心测点有异常 → 逐点输出 result_type='sensor_anomaly', anomaly_label=标签 该检测器关心测点无异常 → 模型预测,逐点输出 输出表 anomaly_points(逐点): data_time, model_name, farm_name, turbine_name, detector, result_type, is_anomaly, anomaly_score, anomaly_label, detect_time 用法: python detect.py # 检测所有机型所有风机(前1天数据) python detect.py --model 机型名称 # 只检测指定机型 python detect.py --date 2026-02-24 # 指定日期(默认昨天) """ import argparse import sqlite3 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from pathlib import Path import joblib import pandas as pd import numpy as np from config import MODEL_SAVE_DIR, DB_PATH from data_loader import list_model_types, list_turbines, load_turbine from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS from models.wind_power import PowerCurveDetector, ScatterDetector from models.yaw import StaticYawDetector, CableTwistDetector from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector from models.control_params import PowerQualityDetector, OperationStateDetector # ── 数据库 ───────────────────────────────────────────────────────────────────── def init_db(db_path: str) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS anomaly_points ( id INTEGER PRIMARY KEY AUTOINCREMENT, data_time TEXT, -- 数据时间戳(来自 parquet 的 data_time 列) model_name TEXT, -- 机型 farm_name TEXT, -- 风场 turbine_name TEXT, -- 风机 detector TEXT, -- 检测器名称 result_type TEXT, -- 'anomaly_detection' | 'sensor_anomaly' is_anomaly INTEGER, -- 1=异常, 0=正常(sensor_anomaly 时恒为 1) anomaly_score REAL, -- 异常分数(越低越异常;sensor_anomaly 时为 NULL) anomaly_label TEXT, -- 异常标签(sensor_anomaly 时填传感器异常类型,anomaly_detection 时为 NULL) detect_time TEXT -- 本次检测运行时间 ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_ap_turbine_time ON anomaly_points (turbine_name, data_time) """) conn.commit() return conn def _bulk_insert(conn, rows: list): """批量写入,rows 为 tuple 列表。""" if not rows: return conn.executemany( "INSERT INTO anomaly_points VALUES (NULL,?,?,?,?,?,?,?,?,?,?)", rows ) conn.commit() def _delete_existing(conn, turbine_name: str, farm_name: str, target_date: str): """写入前删除该风机该日期的旧记录,防止重复运行产生重复数据。""" conn.execute( "DELETE FROM anomaly_points WHERE turbine_name=? AND farm_name=? AND data_time LIKE ?", (turbine_name, farm_name, f"{target_date}%"), ) conn.commit() # ── 数据时间过滤 ─────────────────────────────────────────────────────────────── def _filter_date(df: pd.DataFrame, target_date: datetime.date) -> pd.DataFrame: """ 过滤 data_time 列在 target_date 当天的数据。 data_time 列不存在时返回原始数据(不过滤)。 """ if "data_time" not in df.columns: return df dt = pd.to_datetime(df["data_time"], errors="coerce") mask = dt.dt.date == target_date return df[mask & dt.notna()].copy() # ── 核心检测函数(逐点输出)──────────────────────────────────────────────────── def _run_detector( model_name: str, farm_name: str, turbine_name: str, model, detector_name: str, df_running: pd.DataFrame, df_sensor: pd.DataFrame, detect_time: str, ) -> list: """ 对单个检测器执行检测,返回 rows 列表(由调用方统一写入数据库)。 model: 已加载的检测器对象(机型级预加载,避免每台风机重复 joblib.load)。 """ if model is None: return [] rows = [] sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, []) # ── 处理传感器异常数据 ──────────────────────────────────────────────────── if not df_sensor.empty: existing_sc = [c for c in sensor_cols if c in df_sensor.columns] if existing_sc: has_anom = df_sensor[existing_sc].any(axis=1) else: has_anom = pd.Series(False, index=df_sensor.index) # 有传感器异常 → 直接标记 df_direct = df_sensor[has_anom] if not df_direct.empty: dt_col = df_direct["data_time"].astype(str) if "data_time" in df_direct.columns \ else pd.Series("", index=df_direct.index) st_col = df_direct["status"].astype(str) if "status" in df_direct.columns \ else pd.Series("传感器异常", index=df_direct.index) for t, st in zip(dt_col.values, st_col.values): rows.append(( t, model_name, farm_name, turbine_name, detector_name, "sensor_anomaly", 1, None, st, detect_time, )) # 无传感器异常 → 加入模型检测队列 df_sensor_clean = df_sensor[~has_anom] else: df_sensor_clean = pd.DataFrame() # ── 合并需要模型预测的数据 ──────────────────────────────────────────────── frames = [f for f in [df_running, df_sensor_clean] if not f.empty] if frames: df_detect = pd.concat(frames, ignore_index=False) try: result = model.predict(df_detect) # 按 index 对齐时间戳,避免 predict 内部 dropna 后行数不一致导致错位 time_map = df_detect["data_time"].astype(str).to_dict() if "data_time" in df_detect.columns \ else {i: "" for i in df_detect.index} is_anom_col = result["anomaly"].astype(int) if "anomaly" in result.columns \ else pd.Series(0, index=result.index) score_col = result["score"] if "score" in result.columns \ else pd.Series(np.nan, index=result.index) for idx in result.index: sc_val = score_col.loc[idx] sc = float(sc_val) if pd.notna(sc_val) else None rows.append(( time_map.get(idx, ""), model_name, farm_name, turbine_name, detector_name, "anomaly_detection", int(is_anom_col.loc[idx]), sc, None, detect_time, )) n_anom = int(is_anom_col.sum()) print(f" [{detector_name}] 检测 {len(result)} 点,异常 {n_anom} 点") except Exception as e: print(f" [{detector_name}] 检测失败: {e}") return rows # ── 单台风机检测 ─────────────────────────────────────────────────────────────── def detect_turbine(rec: dict, models: dict, conn: sqlite3.Connection, model_stats: dict, target_date, detect_time: str): """models: {detector_name: loaded_model_object},机型级预加载。""" mn, fn, tn, path = rec["model_name"], rec["farm_name"], rec["turbine_name"], rec["path"] all_cols = [ "data_time", "wind_spd", "p_active", "gen_spd", "actual_torque", "yaw_ang", "twist_ang", "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3", "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3", "pitch_spd_1", "pitch_spd_2", "pitch_spd_3", "rotor_spd", "theory_p_active", "p_reactive", "grid_freq", "grid_ia", "grid_ib", "grid_ic", "grid_ua", "grid_ub", "grid_uc", "ambient_temp", ] df_raw = load_turbine(path, required_cols=["p_active"], optional_cols=all_cols) if df_raw is None: return df_raw = _filter_date(df_raw, target_date) if df_raw.empty: print(f" {fn}/{tn}: 无 {target_date} 数据,跳过") return # 写入前去重 _delete_existing(conn, tn, fn, str(target_date)) labeled = label_dataframe(df_raw, model_stats, mn) df_run = labeled[labeled["status"] == "运行"].copy() df_sensor = labeled[labeled["status"].str.startswith("传感器异常")].copy() total = len(labeled) print(f" {fn}/{tn}: 总{total} | 运行{len(df_run)} | " f"传感器异常{len(df_sensor)} | 停机/限功率{total-len(df_run)-len(df_sensor)}") # 并行运行9个检测器(模型对象只读,线程安全),主线程统一写入避免 SQLite 锁冲突 DETECTOR_NAMES = [ "wind_power_curve", "wind_power_scatter", "yaw_static", "yaw_twist", "pitch_regulation", "pitch_coord", "pitch_min", "ctrl_power_quality", "ctrl_op_state", ] all_rows = [] with ThreadPoolExecutor(max_workers=min(9, len(DETECTOR_NAMES))) as executor: futures = { executor.submit( _run_detector, mn, fn, tn, models.get(det_name), det_name, df_run, df_sensor, detect_time ): det_name for det_name in DETECTOR_NAMES } for future in as_completed(futures): det_name = futures[future] try: all_rows.extend(future.result()) except Exception as e: print(f" [{det_name}] 并行检测异常: {e}") _bulk_insert(conn, all_rows) # ── 机型级检测 ───────────────────────────────────────────────────────────────── def detect_model_type(model_name: str, conn: sqlite3.Connection, target_date, detect_time: str): model_dir = MODEL_SAVE_DIR / model_name if not model_dir.exists(): print(f"[SKIP] 机型 {model_name} 无已训练模型,请先运行 train.py") return turbines = list_turbines(model_name) print(f"\n机型 {model_name}: 共 {len(turbines)} 台风机,检测日期 {target_date}") # ── 加载 model_stats(优先从 pkl,回退到全量数据) ── stats_path = model_dir / "model_stats.pkl" if stats_path.exists(): model_stats = joblib.load(stats_path) print(f" [stats] 从 model_stats.pkl 加载") else: from data_loader import load_model_type df_all = load_model_type(model_name, required_cols=["p_active"], optional_cols=["gen_spd", "actual_torque", "pitch_ang_act_1"]) if df_all.empty: print(f"[SKIP] 机型 {model_name} 无数据") return model_stats = get_model_statistics(df_all) del df_all print(f" [stats] 从全量数据计算(建议重新训练以生成 model_stats.pkl)") # ── 机型级预加载所有模型(一次 joblib.load,所有风机复用) ── PKL_MAP = { "wind_power_curve": ("wind_power_curve.pkl", PowerCurveDetector.load), "wind_power_scatter": ("wind_power_scatter.pkl", ScatterDetector.load), "yaw_static": ("yaw_static.pkl", StaticYawDetector.load), "yaw_twist": ("yaw_twist.pkl", CableTwistDetector.load), "pitch_regulation": ("pitch_regulation.pkl", PitchRegulationDetector.load), "pitch_coord": ("pitch_coord.pkl", PitchCoordDetector.load), "pitch_min": ("pitch_min.pkl", MinPitchDetector.load), "ctrl_power_quality": ("ctrl_power_quality.pkl", PowerQualityDetector.load), "ctrl_op_state": ("ctrl_op_state.pkl", OperationStateDetector.load), } models = {} for det_name, (pkl_file, load_fn) in PKL_MAP.items(): pkl_path = model_dir / pkl_file if pkl_path.exists(): try: models[det_name] = load_fn(pkl_path) except Exception as e: print(f" [WARN] 加载 {pkl_file} 失败: {e}") print(f" [模型] 预加载 {len(models)}/{len(PKL_MAP)} 个检测器") for rec in turbines: print(f" 检测: {rec['farm_name']} / {rec['turbine_name']}") detect_turbine(rec, models, conn, model_stats, target_date, detect_time) # ── 主流程 ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="风机异常检测推理(逐点输出)") parser.add_argument("--model", type=str, default=None, help="指定机型名称") parser.add_argument("--date", type=str, default=None, help="指定检测日期 YYYY-MM-DD,默认为昨天") args = parser.parse_args() if args.date: target_date = datetime.strptime(args.date, "%Y-%m-%d").date() else: target_date = (datetime.now() - timedelta(days=1)).date() detect_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn = init_db(DB_PATH) print(f"结果数据库: {DB_PATH}") print(f"检测日期: {target_date}") if args.model: detect_model_type(args.model, conn, target_date, detect_time) else: for mt in list_model_types(): detect_model_type(mt, conn, target_date, detect_time) conn.close() print("\n检测完成。") if __name__ == "__main__": main()