|
|
@@ -0,0 +1,333 @@
|
|
|
+"""
|
|
|
+推理入口: 读取前1天 parquet 数据,逐点输出异常检测结果。
|
|
|
+
|
|
|
+数据读取:
|
|
|
+ - 每天更新一个 parquet,路径: {PARQUET_ROOT}/{model_name}/{farm_name}/{turbine_name}.parquet
|
|
|
+ - 读取时过滤 data_time 在 [昨日 00:00, 今日 00:00) 范围内的数据
|
|
|
+
|
|
|
+检测逻辑:
|
|
|
+ 1. 打标(运行/限功率/停机/传感器异常)
|
|
|
+ 2. 停机 / 限功率 → 跳过,不输出
|
|
|
+ 3. 运行数据 → 模型预测,逐点输出 is_anomaly + score
|
|
|
+ 4. 传感器异常数据:
|
|
|
+ 该检测器关心测点有异常 → 逐点输出 result_type='sensor_anomaly', anomaly_label=标签
|
|
|
+ 该检测器关心测点无异常 → 模型预测,逐点输出
|
|
|
+
|
|
|
+输出表 anomaly_points(逐点):
|
|
|
+ data_time, model_name, farm_name, turbine_name, detector,
|
|
|
+ result_type, is_anomaly, anomaly_score, anomaly_label, detect_time
|
|
|
+
|
|
|
+用法:
|
|
|
+ python detect.py # 检测所有机型所有风机(前1天数据)
|
|
|
+ python detect.py --model 机型名称 # 只检测指定机型
|
|
|
+ python detect.py --date 2026-02-24 # 指定日期(默认昨天)
|
|
|
+"""
|
|
|
+import argparse
|
|
|
+import sqlite3
|
|
|
+from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import joblib
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+from config import MODEL_SAVE_DIR, DB_PATH
|
|
|
+from data_loader import list_model_types, list_turbines, load_turbine
|
|
|
+from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS
|
|
|
+from models.wind_power import PowerCurveDetector, ScatterDetector
|
|
|
+from models.yaw import StaticYawDetector, CableTwistDetector
|
|
|
+from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector
|
|
|
+from models.control_params import PowerQualityDetector, OperationStateDetector
|
|
|
+
|
|
|
+
|
|
|
+# ── 数据库 ─────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def init_db(db_path: str) -> sqlite3.Connection:
|
|
|
+ conn = sqlite3.connect(db_path)
|
|
|
+ conn.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS anomaly_points (
|
|
|
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
+ data_time TEXT, -- 数据时间戳(来自 parquet 的 data_time 列)
|
|
|
+ model_name TEXT, -- 机型
|
|
|
+ farm_name TEXT, -- 风场
|
|
|
+ turbine_name TEXT, -- 风机
|
|
|
+ detector TEXT, -- 检测器名称
|
|
|
+ result_type TEXT, -- 'anomaly_detection' | 'sensor_anomaly'
|
|
|
+ is_anomaly INTEGER, -- 1=异常, 0=正常(sensor_anomaly 时恒为 1)
|
|
|
+ anomaly_score REAL, -- 异常分数(越低越异常;sensor_anomaly 时为 NULL)
|
|
|
+ anomaly_label TEXT, -- 异常标签(sensor_anomaly 时填传感器异常类型,anomaly_detection 时为 NULL)
|
|
|
+ detect_time TEXT -- 本次检测运行时间
|
|
|
+ )
|
|
|
+ """)
|
|
|
+ conn.execute("""
|
|
|
+ CREATE INDEX IF NOT EXISTS idx_ap_turbine_time
|
|
|
+ ON anomaly_points (turbine_name, data_time)
|
|
|
+ """)
|
|
|
+ conn.commit()
|
|
|
+ return conn
|
|
|
+
|
|
|
+
|
|
|
+def _bulk_insert(conn, rows: list):
|
|
|
+ """批量写入,rows 为 tuple 列表。"""
|
|
|
+ if not rows:
|
|
|
+ return
|
|
|
+ conn.executemany(
|
|
|
+ "INSERT INTO anomaly_points VALUES (NULL,?,?,?,?,?,?,?,?,?,?)",
|
|
|
+ rows
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+
|
|
|
+def _delete_existing(conn, turbine_name: str, farm_name: str, target_date: str):
|
|
|
+ """写入前删除该风机该日期的旧记录,防止重复运行产生重复数据。"""
|
|
|
+ conn.execute(
|
|
|
+ "DELETE FROM anomaly_points WHERE turbine_name=? AND farm_name=? AND data_time LIKE ?",
|
|
|
+ (turbine_name, farm_name, f"{target_date}%"),
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+
|
|
|
+# ── 数据时间过滤 ───────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def _filter_date(df: pd.DataFrame, target_date: datetime.date) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ 过滤 data_time 列在 target_date 当天的数据。
|
|
|
+ data_time 列不存在时返回原始数据(不过滤)。
|
|
|
+ """
|
|
|
+ if "data_time" not in df.columns:
|
|
|
+ return df
|
|
|
+ dt = pd.to_datetime(df["data_time"], errors="coerce")
|
|
|
+ mask = dt.dt.date == target_date
|
|
|
+ return df[mask & dt.notna()].copy()
|
|
|
+
|
|
|
+
|
|
|
+# ── 核心检测函数(逐点输出)────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def _run_detector(
|
|
|
+ model_name: str, farm_name: str, turbine_name: str,
|
|
|
+ model, detector_name: str,
|
|
|
+ df_running: pd.DataFrame,
|
|
|
+ df_sensor: pd.DataFrame,
|
|
|
+ detect_time: str,
|
|
|
+) -> list:
|
|
|
+ """
|
|
|
+ 对单个检测器执行检测,返回 rows 列表(由调用方统一写入数据库)。
|
|
|
+ model: 已加载的检测器对象(机型级预加载,避免每台风机重复 joblib.load)。
|
|
|
+ """
|
|
|
+ if model is None:
|
|
|
+ return []
|
|
|
+
|
|
|
+ rows = []
|
|
|
+ sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, [])
|
|
|
+
|
|
|
+ # ── 处理传感器异常数据 ────────────────────────────────────────────────────
|
|
|
+ if not df_sensor.empty:
|
|
|
+ existing_sc = [c for c in sensor_cols if c in df_sensor.columns]
|
|
|
+ if existing_sc:
|
|
|
+ has_anom = df_sensor[existing_sc].any(axis=1)
|
|
|
+ else:
|
|
|
+ has_anom = pd.Series(False, index=df_sensor.index)
|
|
|
+
|
|
|
+ # 有传感器异常 → 直接标记
|
|
|
+ df_direct = df_sensor[has_anom]
|
|
|
+ if not df_direct.empty:
|
|
|
+ dt_col = df_direct["data_time"].astype(str) if "data_time" in df_direct.columns \
|
|
|
+ else pd.Series("", index=df_direct.index)
|
|
|
+ st_col = df_direct["status"].astype(str) if "status" in df_direct.columns \
|
|
|
+ else pd.Series("传感器异常", index=df_direct.index)
|
|
|
+ for t, st in zip(dt_col.values, st_col.values):
|
|
|
+ rows.append((
|
|
|
+ t, model_name, farm_name, turbine_name, detector_name,
|
|
|
+ "sensor_anomaly", 1, None, st, detect_time,
|
|
|
+ ))
|
|
|
+
|
|
|
+ # 无传感器异常 → 加入模型检测队列
|
|
|
+ df_sensor_clean = df_sensor[~has_anom]
|
|
|
+ else:
|
|
|
+ df_sensor_clean = pd.DataFrame()
|
|
|
+
|
|
|
+ # ── 合并需要模型预测的数据 ────────────────────────────────────────────────
|
|
|
+ frames = [f for f in [df_running, df_sensor_clean] if not f.empty]
|
|
|
+ if frames:
|
|
|
+ df_detect = pd.concat(frames, ignore_index=False)
|
|
|
+ try:
|
|
|
+ result = model.predict(df_detect)
|
|
|
+ # 按 index 对齐时间戳,避免 predict 内部 dropna 后行数不一致导致错位
|
|
|
+ time_map = df_detect["data_time"].astype(str).to_dict() if "data_time" in df_detect.columns \
|
|
|
+ else {i: "" for i in df_detect.index}
|
|
|
+ is_anom_col = result["anomaly"].astype(int) if "anomaly" in result.columns \
|
|
|
+ else pd.Series(0, index=result.index)
|
|
|
+ score_col = result["score"] if "score" in result.columns \
|
|
|
+ else pd.Series(np.nan, index=result.index)
|
|
|
+ for idx in result.index:
|
|
|
+ sc_val = score_col.loc[idx]
|
|
|
+ sc = float(sc_val) if pd.notna(sc_val) else None
|
|
|
+ rows.append((
|
|
|
+ time_map.get(idx, ""),
|
|
|
+ model_name, farm_name, turbine_name, detector_name,
|
|
|
+ "anomaly_detection",
|
|
|
+ int(is_anom_col.loc[idx]),
|
|
|
+ sc,
|
|
|
+ None,
|
|
|
+ detect_time,
|
|
|
+ ))
|
|
|
+ n_anom = int(is_anom_col.sum())
|
|
|
+ print(f" [{detector_name}] 检测 {len(result)} 点,异常 {n_anom} 点")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" [{detector_name}] 检测失败: {e}")
|
|
|
+
|
|
|
+ return rows
|
|
|
+
|
|
|
+
|
|
|
+# ── 单台风机检测 ───────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def detect_turbine(rec: dict, models: dict, conn: sqlite3.Connection,
|
|
|
+ model_stats: dict, target_date, detect_time: str):
|
|
|
+ """models: {detector_name: loaded_model_object},机型级预加载。"""
|
|
|
+ mn, fn, tn, path = rec["model_name"], rec["farm_name"], rec["turbine_name"], rec["path"]
|
|
|
+
|
|
|
+ all_cols = [
|
|
|
+ "data_time",
|
|
|
+ "wind_spd", "p_active", "gen_spd", "actual_torque",
|
|
|
+ "yaw_ang", "twist_ang",
|
|
|
+ "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3",
|
|
|
+ "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3",
|
|
|
+ "pitch_spd_1", "pitch_spd_2", "pitch_spd_3",
|
|
|
+ "rotor_spd", "theory_p_active", "p_reactive", "grid_freq",
|
|
|
+ "grid_ia", "grid_ib", "grid_ic",
|
|
|
+ "grid_ua", "grid_ub", "grid_uc",
|
|
|
+ "ambient_temp",
|
|
|
+ ]
|
|
|
+ df_raw = load_turbine(path, required_cols=["p_active"], optional_cols=all_cols)
|
|
|
+ if df_raw is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ df_raw = _filter_date(df_raw, target_date)
|
|
|
+ if df_raw.empty:
|
|
|
+ print(f" {fn}/{tn}: 无 {target_date} 数据,跳过")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 写入前去重
|
|
|
+ _delete_existing(conn, tn, fn, str(target_date))
|
|
|
+
|
|
|
+ labeled = label_dataframe(df_raw, model_stats, mn)
|
|
|
+ df_run = labeled[labeled["status"] == "运行"].copy()
|
|
|
+ df_sensor = labeled[labeled["status"].str.startswith("传感器异常")].copy()
|
|
|
+
|
|
|
+ total = len(labeled)
|
|
|
+ print(f" {fn}/{tn}: 总{total} | 运行{len(df_run)} | "
|
|
|
+ f"传感器异常{len(df_sensor)} | 停机/限功率{total-len(df_run)-len(df_sensor)}")
|
|
|
+
|
|
|
+ # 并行运行9个检测器(模型对象只读,线程安全),主线程统一写入避免 SQLite 锁冲突
|
|
|
+ DETECTOR_NAMES = [
|
|
|
+ "wind_power_curve", "wind_power_scatter",
|
|
|
+ "yaw_static", "yaw_twist",
|
|
|
+ "pitch_regulation", "pitch_coord", "pitch_min",
|
|
|
+ "ctrl_power_quality", "ctrl_op_state",
|
|
|
+ ]
|
|
|
+ all_rows = []
|
|
|
+ with ThreadPoolExecutor(max_workers=min(9, len(DETECTOR_NAMES))) as executor:
|
|
|
+ futures = {
|
|
|
+ executor.submit(
|
|
|
+ _run_detector, mn, fn, tn,
|
|
|
+ models.get(det_name), det_name,
|
|
|
+ df_run, df_sensor, detect_time
|
|
|
+ ): det_name
|
|
|
+ for det_name in DETECTOR_NAMES
|
|
|
+ }
|
|
|
+ for future in as_completed(futures):
|
|
|
+ det_name = futures[future]
|
|
|
+ try:
|
|
|
+ all_rows.extend(future.result())
|
|
|
+ except Exception as e:
|
|
|
+ print(f" [{det_name}] 并行检测异常: {e}")
|
|
|
+ _bulk_insert(conn, all_rows)
|
|
|
+
|
|
|
+
|
|
|
+# ── 机型级检测 ─────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def detect_model_type(model_name: str, conn: sqlite3.Connection,
|
|
|
+ target_date, detect_time: str):
|
|
|
+ model_dir = MODEL_SAVE_DIR / model_name
|
|
|
+ if not model_dir.exists():
|
|
|
+ print(f"[SKIP] 机型 {model_name} 无已训练模型,请先运行 train.py")
|
|
|
+ return
|
|
|
+
|
|
|
+ turbines = list_turbines(model_name)
|
|
|
+ print(f"\n机型 {model_name}: 共 {len(turbines)} 台风机,检测日期 {target_date}")
|
|
|
+
|
|
|
+ # ── 加载 model_stats(优先从 pkl,回退到全量数据) ──
|
|
|
+ stats_path = model_dir / "model_stats.pkl"
|
|
|
+ if stats_path.exists():
|
|
|
+ model_stats = joblib.load(stats_path)
|
|
|
+ print(f" [stats] 从 model_stats.pkl 加载")
|
|
|
+ else:
|
|
|
+ from data_loader import load_model_type
|
|
|
+ df_all = load_model_type(model_name, required_cols=["p_active"],
|
|
|
+ optional_cols=["gen_spd", "actual_torque", "pitch_ang_act_1"])
|
|
|
+ if df_all.empty:
|
|
|
+ print(f"[SKIP] 机型 {model_name} 无数据")
|
|
|
+ return
|
|
|
+ model_stats = get_model_statistics(df_all)
|
|
|
+ del df_all
|
|
|
+ print(f" [stats] 从全量数据计算(建议重新训练以生成 model_stats.pkl)")
|
|
|
+
|
|
|
+ # ── 机型级预加载所有模型(一次 joblib.load,所有风机复用) ──
|
|
|
+ PKL_MAP = {
|
|
|
+ "wind_power_curve": ("wind_power_curve.pkl", PowerCurveDetector.load),
|
|
|
+ "wind_power_scatter": ("wind_power_scatter.pkl", ScatterDetector.load),
|
|
|
+ "yaw_static": ("yaw_static.pkl", StaticYawDetector.load),
|
|
|
+ "yaw_twist": ("yaw_twist.pkl", CableTwistDetector.load),
|
|
|
+ "pitch_regulation": ("pitch_regulation.pkl", PitchRegulationDetector.load),
|
|
|
+ "pitch_coord": ("pitch_coord.pkl", PitchCoordDetector.load),
|
|
|
+ "pitch_min": ("pitch_min.pkl", MinPitchDetector.load),
|
|
|
+ "ctrl_power_quality": ("ctrl_power_quality.pkl", PowerQualityDetector.load),
|
|
|
+ "ctrl_op_state": ("ctrl_op_state.pkl", OperationStateDetector.load),
|
|
|
+ }
|
|
|
+ models = {}
|
|
|
+ for det_name, (pkl_file, load_fn) in PKL_MAP.items():
|
|
|
+ pkl_path = model_dir / pkl_file
|
|
|
+ if pkl_path.exists():
|
|
|
+ try:
|
|
|
+ models[det_name] = load_fn(pkl_path)
|
|
|
+ except Exception as e:
|
|
|
+ print(f" [WARN] 加载 {pkl_file} 失败: {e}")
|
|
|
+ print(f" [模型] 预加载 {len(models)}/{len(PKL_MAP)} 个检测器")
|
|
|
+
|
|
|
+ for rec in turbines:
|
|
|
+ print(f" 检测: {rec['farm_name']} / {rec['turbine_name']}")
|
|
|
+ detect_turbine(rec, models, conn, model_stats, target_date, detect_time)
|
|
|
+
|
|
|
+
|
|
|
+# ── 主流程 ─────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser(description="风机异常检测推理(逐点输出)")
|
|
|
+ parser.add_argument("--model", type=str, default=None, help="指定机型名称")
|
|
|
+ parser.add_argument("--date", type=str, default=None,
|
|
|
+ help="指定检测日期 YYYY-MM-DD,默认为昨天")
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ if args.date:
|
|
|
+ target_date = datetime.strptime(args.date, "%Y-%m-%d").date()
|
|
|
+ else:
|
|
|
+ target_date = (datetime.now() - timedelta(days=1)).date()
|
|
|
+
|
|
|
+ detect_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ conn = init_db(DB_PATH)
|
|
|
+ print(f"结果数据库: {DB_PATH}")
|
|
|
+ print(f"检测日期: {target_date}")
|
|
|
+
|
|
|
+ if args.model:
|
|
|
+ detect_model_type(args.model, conn, target_date, detect_time)
|
|
|
+ else:
|
|
|
+ for mt in list_model_types():
|
|
|
+ detect_model_type(mt, conn, target_date, detect_time)
|
|
|
+
|
|
|
+ conn.close()
|
|
|
+ print("\n检测完成。")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|