| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- """
- 推理入口: 读取前1天 parquet 数据,逐点输出异常检测结果。
- 数据读取:
- - 每天更新一个 parquet,路径: {PARQUET_ROOT}/{model_name}/{farm_name}/{turbine_name}.parquet
- - 读取时过滤 data_time 在 [昨日 00:00, 今日 00:00) 范围内的数据
- 检测逻辑:
- 1. 打标(运行/限功率/停机/传感器异常)
- 2. 停机 / 限功率 → 跳过,不输出
- 3. 运行数据 → 模型预测,逐点输出 is_anomaly + score
- 4. 传感器异常数据:
- 该检测器关心测点有异常 → 逐点输出 result_type='sensor_anomaly', anomaly_label=标签
- 该检测器关心测点无异常 → 模型预测,逐点输出
- 输出表 anomaly_points(逐点):
- data_time, model_name, farm_name, turbine_name, detector,
- result_type, is_anomaly, anomaly_score, anomaly_label, detect_time
- 用法:
- python detect.py # 检测所有机型所有风机(前1天数据)
- python detect.py --model 机型名称 # 只检测指定机型
- python detect.py --date 2026-02-24 # 指定日期(默认昨天)
- """
- import argparse
- import sqlite3
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from datetime import datetime, timedelta
- from pathlib import Path
- import joblib
- import pandas as pd
- import numpy as np
- from config import MODEL_SAVE_DIR, DB_PATH
- from data_loader import list_model_types, list_turbines, load_turbine
- from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS
- from models.wind_power import PowerCurveDetector, ScatterDetector
- from models.yaw import StaticYawDetector, CableTwistDetector
- from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector
- from models.control_params import PowerQualityDetector, OperationStateDetector
- # ── 数据库 ─────────────────────────────────────────────────────────────────────
- def init_db(db_path: str) -> sqlite3.Connection:
- conn = sqlite3.connect(db_path)
- conn.execute("""
- CREATE TABLE IF NOT EXISTS anomaly_points (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- data_time TEXT, -- 数据时间戳(来自 parquet 的 data_time 列)
- model_name TEXT, -- 机型
- farm_name TEXT, -- 风场
- turbine_name TEXT, -- 风机
- detector TEXT, -- 检测器名称
- result_type TEXT, -- 'anomaly_detection' | 'sensor_anomaly'
- is_anomaly INTEGER, -- 1=异常, 0=正常(sensor_anomaly 时恒为 1)
- anomaly_score REAL, -- 异常分数(越低越异常;sensor_anomaly 时为 NULL)
- anomaly_label TEXT, -- 异常标签(sensor_anomaly 时填传感器异常类型,anomaly_detection 时为 NULL)
- detect_time TEXT -- 本次检测运行时间
- )
- """)
- conn.execute("""
- CREATE INDEX IF NOT EXISTS idx_ap_turbine_time
- ON anomaly_points (turbine_name, data_time)
- """)
- conn.commit()
- return conn
- def _bulk_insert(conn, rows: list):
- """批量写入,rows 为 tuple 列表。"""
- if not rows:
- return
- conn.executemany(
- "INSERT INTO anomaly_points VALUES (NULL,?,?,?,?,?,?,?,?,?,?)",
- rows
- )
- conn.commit()
- def _delete_existing(conn, turbine_name: str, farm_name: str, target_date: str):
- """写入前删除该风机该日期的旧记录,防止重复运行产生重复数据。"""
- conn.execute(
- "DELETE FROM anomaly_points WHERE turbine_name=? AND farm_name=? AND data_time LIKE ?",
- (turbine_name, farm_name, f"{target_date}%"),
- )
- conn.commit()
- # ── 数据时间过滤 ───────────────────────────────────────────────────────────────
- def _filter_date(df: pd.DataFrame, target_date: datetime.date) -> pd.DataFrame:
- """
- 过滤 data_time 列在 target_date 当天的数据。
- data_time 列不存在时返回原始数据(不过滤)。
- """
- if "data_time" not in df.columns:
- return df
- dt = pd.to_datetime(df["data_time"], errors="coerce")
- mask = dt.dt.date == target_date
- return df[mask & dt.notna()].copy()
- # ── 核心检测函数(逐点输出)────────────────────────────────────────────────────
- def _run_detector(
- model_name: str, farm_name: str, turbine_name: str,
- model, detector_name: str,
- df_running: pd.DataFrame,
- df_sensor: pd.DataFrame,
- detect_time: str,
- ) -> list:
- """
- 对单个检测器执行检测,返回 rows 列表(由调用方统一写入数据库)。
- model: 已加载的检测器对象(机型级预加载,避免每台风机重复 joblib.load)。
- """
- if model is None:
- return []
- rows = []
- sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, [])
- # ── 处理传感器异常数据 ────────────────────────────────────────────────────
- if not df_sensor.empty:
- existing_sc = [c for c in sensor_cols if c in df_sensor.columns]
- if existing_sc:
- has_anom = df_sensor[existing_sc].any(axis=1)
- else:
- has_anom = pd.Series(False, index=df_sensor.index)
- # 有传感器异常 → 直接标记
- df_direct = df_sensor[has_anom]
- if not df_direct.empty:
- dt_col = df_direct["data_time"].astype(str) if "data_time" in df_direct.columns \
- else pd.Series("", index=df_direct.index)
- st_col = df_direct["status"].astype(str) if "status" in df_direct.columns \
- else pd.Series("传感器异常", index=df_direct.index)
- for t, st in zip(dt_col.values, st_col.values):
- rows.append((
- t, model_name, farm_name, turbine_name, detector_name,
- "sensor_anomaly", 1, None, st, detect_time,
- ))
- # 无传感器异常 → 加入模型检测队列
- df_sensor_clean = df_sensor[~has_anom]
- else:
- df_sensor_clean = pd.DataFrame()
- # ── 合并需要模型预测的数据 ────────────────────────────────────────────────
- frames = [f for f in [df_running, df_sensor_clean] if not f.empty]
- if frames:
- df_detect = pd.concat(frames, ignore_index=False)
- try:
- result = model.predict(df_detect)
- # 按 index 对齐时间戳,避免 predict 内部 dropna 后行数不一致导致错位
- time_map = df_detect["data_time"].astype(str).to_dict() if "data_time" in df_detect.columns \
- else {i: "" for i in df_detect.index}
- is_anom_col = result["anomaly"].astype(int) if "anomaly" in result.columns \
- else pd.Series(0, index=result.index)
- score_col = result["score"] if "score" in result.columns \
- else pd.Series(np.nan, index=result.index)
- for idx in result.index:
- sc_val = score_col.loc[idx]
- sc = float(sc_val) if pd.notna(sc_val) else None
- rows.append((
- time_map.get(idx, ""),
- model_name, farm_name, turbine_name, detector_name,
- "anomaly_detection",
- int(is_anom_col.loc[idx]),
- sc,
- None,
- detect_time,
- ))
- n_anom = int(is_anom_col.sum())
- print(f" [{detector_name}] 检测 {len(result)} 点,异常 {n_anom} 点")
- except Exception as e:
- print(f" [{detector_name}] 检测失败: {e}")
- return rows
- # ── 单台风机检测 ───────────────────────────────────────────────────────────────
- def detect_turbine(rec: dict, models: dict, conn: sqlite3.Connection,
- model_stats: dict, target_date, detect_time: str):
- """models: {detector_name: loaded_model_object},机型级预加载。"""
- mn, fn, tn, path = rec["model_name"], rec["farm_name"], rec["turbine_name"], rec["path"]
- all_cols = [
- "data_time",
- "wind_spd", "p_active", "gen_spd", "actual_torque",
- "yaw_ang", "twist_ang",
- "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3",
- "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3",
- "pitch_spd_1", "pitch_spd_2", "pitch_spd_3",
- "rotor_spd", "theory_p_active", "p_reactive", "grid_freq",
- "grid_ia", "grid_ib", "grid_ic",
- "grid_ua", "grid_ub", "grid_uc",
- "ambient_temp",
- ]
- df_raw = load_turbine(path, required_cols=["p_active"], optional_cols=all_cols)
- if df_raw is None:
- return
- df_raw = _filter_date(df_raw, target_date)
- if df_raw.empty:
- print(f" {fn}/{tn}: 无 {target_date} 数据,跳过")
- return
- # 写入前去重
- _delete_existing(conn, tn, fn, str(target_date))
- labeled = label_dataframe(df_raw, model_stats, mn)
- df_run = labeled[labeled["status"] == "运行"].copy()
- df_sensor = labeled[labeled["status"].str.startswith("传感器异常")].copy()
- total = len(labeled)
- print(f" {fn}/{tn}: 总{total} | 运行{len(df_run)} | "
- f"传感器异常{len(df_sensor)} | 停机/限功率{total-len(df_run)-len(df_sensor)}")
- # 并行运行9个检测器(模型对象只读,线程安全),主线程统一写入避免 SQLite 锁冲突
- DETECTOR_NAMES = [
- "wind_power_curve", "wind_power_scatter",
- "yaw_static", "yaw_twist",
- "pitch_regulation", "pitch_coord", "pitch_min",
- "ctrl_power_quality", "ctrl_op_state",
- ]
- all_rows = []
- with ThreadPoolExecutor(max_workers=min(9, len(DETECTOR_NAMES))) as executor:
- futures = {
- executor.submit(
- _run_detector, mn, fn, tn,
- models.get(det_name), det_name,
- df_run, df_sensor, detect_time
- ): det_name
- for det_name in DETECTOR_NAMES
- }
- for future in as_completed(futures):
- det_name = futures[future]
- try:
- all_rows.extend(future.result())
- except Exception as e:
- print(f" [{det_name}] 并行检测异常: {e}")
- _bulk_insert(conn, all_rows)
- # ── 机型级检测 ─────────────────────────────────────────────────────────────────
- def detect_model_type(model_name: str, conn: sqlite3.Connection,
- target_date, detect_time: str):
- model_dir = MODEL_SAVE_DIR / model_name
- if not model_dir.exists():
- print(f"[SKIP] 机型 {model_name} 无已训练模型,请先运行 train.py")
- return
- turbines = list_turbines(model_name)
- print(f"\n机型 {model_name}: 共 {len(turbines)} 台风机,检测日期 {target_date}")
- # ── 加载 model_stats(优先从 pkl,回退到全量数据) ──
- stats_path = model_dir / "model_stats.pkl"
- if stats_path.exists():
- model_stats = joblib.load(stats_path)
- print(f" [stats] 从 model_stats.pkl 加载")
- else:
- from data_loader import load_model_type
- df_all = load_model_type(model_name, required_cols=["p_active"],
- optional_cols=["gen_spd", "actual_torque", "pitch_ang_act_1"])
- if df_all.empty:
- print(f"[SKIP] 机型 {model_name} 无数据")
- return
- model_stats = get_model_statistics(df_all)
- del df_all
- print(f" [stats] 从全量数据计算(建议重新训练以生成 model_stats.pkl)")
- # ── 机型级预加载所有模型(一次 joblib.load,所有风机复用) ──
- PKL_MAP = {
- "wind_power_curve": ("wind_power_curve.pkl", PowerCurveDetector.load),
- "wind_power_scatter": ("wind_power_scatter.pkl", ScatterDetector.load),
- "yaw_static": ("yaw_static.pkl", StaticYawDetector.load),
- "yaw_twist": ("yaw_twist.pkl", CableTwistDetector.load),
- "pitch_regulation": ("pitch_regulation.pkl", PitchRegulationDetector.load),
- "pitch_coord": ("pitch_coord.pkl", PitchCoordDetector.load),
- "pitch_min": ("pitch_min.pkl", MinPitchDetector.load),
- "ctrl_power_quality": ("ctrl_power_quality.pkl", PowerQualityDetector.load),
- "ctrl_op_state": ("ctrl_op_state.pkl", OperationStateDetector.load),
- }
- models = {}
- for det_name, (pkl_file, load_fn) in PKL_MAP.items():
- pkl_path = model_dir / pkl_file
- if pkl_path.exists():
- try:
- models[det_name] = load_fn(pkl_path)
- except Exception as e:
- print(f" [WARN] 加载 {pkl_file} 失败: {e}")
- print(f" [模型] 预加载 {len(models)}/{len(PKL_MAP)} 个检测器")
- for rec in turbines:
- print(f" 检测: {rec['farm_name']} / {rec['turbine_name']}")
- detect_turbine(rec, models, conn, model_stats, target_date, detect_time)
- # ── 主流程 ─────────────────────────────────────────────────────────────────────
- def main():
- parser = argparse.ArgumentParser(description="风机异常检测推理(逐点输出)")
- parser.add_argument("--model", type=str, default=None, help="指定机型名称")
- parser.add_argument("--date", type=str, default=None,
- help="指定检测日期 YYYY-MM-DD,默认为昨天")
- args = parser.parse_args()
- if args.date:
- target_date = datetime.strptime(args.date, "%Y-%m-%d").date()
- else:
- target_date = (datetime.now() - timedelta(days=1)).date()
- detect_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- conn = init_db(DB_PATH)
- print(f"结果数据库: {DB_PATH}")
- print(f"检测日期: {target_date}")
- if args.model:
- detect_model_type(args.model, conn, target_date, detect_time)
- else:
- for mt in list_model_types():
- detect_model_type(mt, conn, target_date, detect_time)
- conn.close()
- print("\n检测完成。")
- if __name__ == "__main__":
- main()
|