detect.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. """
  2. 推理入口: 读取前1天 parquet 数据,逐点输出异常检测结果。
  3. 数据读取:
  4. - 每天更新一个 parquet,路径: {PARQUET_ROOT}/{model_name}/{farm_name}/{turbine_name}.parquet
  5. - 读取时过滤 data_time 在 [昨日 00:00, 今日 00:00) 范围内的数据
  6. 检测逻辑:
  7. 1. 打标(运行/限功率/停机/传感器异常)
  8. 2. 停机 / 限功率 → 跳过,不输出
  9. 3. 运行数据 → 模型预测,逐点输出 is_anomaly + score
  10. 4. 传感器异常数据:
  11. 该检测器关心测点有异常 → 逐点输出 result_type='sensor_anomaly', anomaly_label=标签
  12. 该检测器关心测点无异常 → 模型预测,逐点输出
  13. 输出表 anomaly_points(逐点):
  14. data_time, model_name, farm_name, turbine_name, detector,
  15. result_type, is_anomaly, anomaly_score, anomaly_label, detect_time
  16. 用法:
  17. python detect.py # 检测所有机型所有风机(前1天数据)
  18. python detect.py --model 机型名称 # 只检测指定机型
  19. python detect.py --date 2026-02-24 # 指定日期(默认昨天)
  20. """
  21. import argparse
  22. import sqlite3
  23. from concurrent.futures import ThreadPoolExecutor, as_completed
  24. from datetime import datetime, timedelta
  25. from pathlib import Path
  26. import joblib
  27. import pandas as pd
  28. import numpy as np
  29. from config import MODEL_SAVE_DIR, DB_PATH
  30. from data_loader import list_model_types, list_turbines, load_turbine
  31. from labeler import get_model_statistics, label_dataframe, DETECTOR_SENSOR_COLS
  32. from models.wind_power import PowerCurveDetector, ScatterDetector
  33. from models.yaw import StaticYawDetector, CableTwistDetector
  34. from models.pitch import PitchRegulationDetector, PitchCoordDetector, MinPitchDetector
  35. from models.control_params import PowerQualityDetector, OperationStateDetector
  36. # ── 数据库 ─────────────────────────────────────────────────────────────────────
  37. def init_db(db_path: str) -> sqlite3.Connection:
  38. conn = sqlite3.connect(db_path)
  39. conn.execute("""
  40. CREATE TABLE IF NOT EXISTS anomaly_points (
  41. id INTEGER PRIMARY KEY AUTOINCREMENT,
  42. data_time TEXT, -- 数据时间戳(来自 parquet 的 data_time 列)
  43. model_name TEXT, -- 机型
  44. farm_name TEXT, -- 风场
  45. turbine_name TEXT, -- 风机
  46. detector TEXT, -- 检测器名称
  47. result_type TEXT, -- 'anomaly_detection' | 'sensor_anomaly'
  48. is_anomaly INTEGER, -- 1=异常, 0=正常(sensor_anomaly 时恒为 1)
  49. anomaly_score REAL, -- 异常分数(越低越异常;sensor_anomaly 时为 NULL)
  50. anomaly_label TEXT, -- 异常标签(sensor_anomaly 时填传感器异常类型,anomaly_detection 时为 NULL)
  51. detect_time TEXT -- 本次检测运行时间
  52. )
  53. """)
  54. conn.execute("""
  55. CREATE INDEX IF NOT EXISTS idx_ap_turbine_time
  56. ON anomaly_points (turbine_name, data_time)
  57. """)
  58. conn.commit()
  59. return conn
  60. def _bulk_insert(conn, rows: list):
  61. """批量写入,rows 为 tuple 列表。"""
  62. if not rows:
  63. return
  64. conn.executemany(
  65. "INSERT INTO anomaly_points VALUES (NULL,?,?,?,?,?,?,?,?,?,?)",
  66. rows
  67. )
  68. conn.commit()
  69. def _delete_existing(conn, turbine_name: str, farm_name: str, target_date: str):
  70. """写入前删除该风机该日期的旧记录,防止重复运行产生重复数据。"""
  71. conn.execute(
  72. "DELETE FROM anomaly_points WHERE turbine_name=? AND farm_name=? AND data_time LIKE ?",
  73. (turbine_name, farm_name, f"{target_date}%"),
  74. )
  75. conn.commit()
  76. # ── 数据时间过滤 ───────────────────────────────────────────────────────────────
  77. def _filter_date(df: pd.DataFrame, target_date: datetime.date) -> pd.DataFrame:
  78. """
  79. 过滤 data_time 列在 target_date 当天的数据。
  80. data_time 列不存在时返回原始数据(不过滤)。
  81. """
  82. if "data_time" not in df.columns:
  83. return df
  84. dt = pd.to_datetime(df["data_time"], errors="coerce")
  85. mask = dt.dt.date == target_date
  86. return df[mask & dt.notna()].copy()
  87. # ── 核心检测函数(逐点输出)────────────────────────────────────────────────────
  88. def _run_detector(
  89. model_name: str, farm_name: str, turbine_name: str,
  90. model, detector_name: str,
  91. df_running: pd.DataFrame,
  92. df_sensor: pd.DataFrame,
  93. detect_time: str,
  94. ) -> list:
  95. """
  96. 对单个检测器执行检测,返回 rows 列表(由调用方统一写入数据库)。
  97. model: 已加载的检测器对象(机型级预加载,避免每台风机重复 joblib.load)。
  98. """
  99. if model is None:
  100. return []
  101. rows = []
  102. sensor_cols = DETECTOR_SENSOR_COLS.get(detector_name, [])
  103. # ── 处理传感器异常数据 ────────────────────────────────────────────────────
  104. if not df_sensor.empty:
  105. existing_sc = [c for c in sensor_cols if c in df_sensor.columns]
  106. if existing_sc:
  107. has_anom = df_sensor[existing_sc].any(axis=1)
  108. else:
  109. has_anom = pd.Series(False, index=df_sensor.index)
  110. # 有传感器异常 → 直接标记
  111. df_direct = df_sensor[has_anom]
  112. if not df_direct.empty:
  113. dt_col = df_direct["data_time"].astype(str) if "data_time" in df_direct.columns \
  114. else pd.Series("", index=df_direct.index)
  115. st_col = df_direct["status"].astype(str) if "status" in df_direct.columns \
  116. else pd.Series("传感器异常", index=df_direct.index)
  117. for t, st in zip(dt_col.values, st_col.values):
  118. rows.append((
  119. t, model_name, farm_name, turbine_name, detector_name,
  120. "sensor_anomaly", 1, None, st, detect_time,
  121. ))
  122. # 无传感器异常 → 加入模型检测队列
  123. df_sensor_clean = df_sensor[~has_anom]
  124. else:
  125. df_sensor_clean = pd.DataFrame()
  126. # ── 合并需要模型预测的数据 ────────────────────────────────────────────────
  127. frames = [f for f in [df_running, df_sensor_clean] if not f.empty]
  128. if frames:
  129. df_detect = pd.concat(frames, ignore_index=False)
  130. try:
  131. result = model.predict(df_detect)
  132. # 按 index 对齐时间戳,避免 predict 内部 dropna 后行数不一致导致错位
  133. time_map = df_detect["data_time"].astype(str).to_dict() if "data_time" in df_detect.columns \
  134. else {i: "" for i in df_detect.index}
  135. is_anom_col = result["anomaly"].astype(int) if "anomaly" in result.columns \
  136. else pd.Series(0, index=result.index)
  137. score_col = result["score"] if "score" in result.columns \
  138. else pd.Series(np.nan, index=result.index)
  139. for idx in result.index:
  140. sc_val = score_col.loc[idx]
  141. sc = float(sc_val) if pd.notna(sc_val) else None
  142. rows.append((
  143. time_map.get(idx, ""),
  144. model_name, farm_name, turbine_name, detector_name,
  145. "anomaly_detection",
  146. int(is_anom_col.loc[idx]),
  147. sc,
  148. None,
  149. detect_time,
  150. ))
  151. n_anom = int(is_anom_col.sum())
  152. print(f" [{detector_name}] 检测 {len(result)} 点,异常 {n_anom} 点")
  153. except Exception as e:
  154. print(f" [{detector_name}] 检测失败: {e}")
  155. return rows
  156. # ── 单台风机检测 ───────────────────────────────────────────────────────────────
  157. def detect_turbine(rec: dict, models: dict, conn: sqlite3.Connection,
  158. model_stats: dict, target_date, detect_time: str):
  159. """models: {detector_name: loaded_model_object},机型级预加载。"""
  160. mn, fn, tn, path = rec["model_name"], rec["farm_name"], rec["turbine_name"], rec["path"]
  161. all_cols = [
  162. "data_time",
  163. "wind_spd", "p_active", "gen_spd", "actual_torque",
  164. "yaw_ang", "twist_ang",
  165. "pitch_ang_set_1", "pitch_ang_set_2", "pitch_ang_set_3",
  166. "pitch_ang_act_1", "pitch_ang_act_2", "pitch_ang_act_3",
  167. "pitch_spd_1", "pitch_spd_2", "pitch_spd_3",
  168. "rotor_spd", "theory_p_active", "p_reactive", "grid_freq",
  169. "grid_ia", "grid_ib", "grid_ic",
  170. "grid_ua", "grid_ub", "grid_uc",
  171. "ambient_temp",
  172. ]
  173. df_raw = load_turbine(path, required_cols=["p_active"], optional_cols=all_cols)
  174. if df_raw is None:
  175. return
  176. df_raw = _filter_date(df_raw, target_date)
  177. if df_raw.empty:
  178. print(f" {fn}/{tn}: 无 {target_date} 数据,跳过")
  179. return
  180. # 写入前去重
  181. _delete_existing(conn, tn, fn, str(target_date))
  182. labeled = label_dataframe(df_raw, model_stats, mn)
  183. df_run = labeled[labeled["status"] == "运行"].copy()
  184. df_sensor = labeled[labeled["status"].str.startswith("传感器异常")].copy()
  185. total = len(labeled)
  186. print(f" {fn}/{tn}: 总{total} | 运行{len(df_run)} | "
  187. f"传感器异常{len(df_sensor)} | 停机/限功率{total-len(df_run)-len(df_sensor)}")
  188. # 并行运行9个检测器(模型对象只读,线程安全),主线程统一写入避免 SQLite 锁冲突
  189. DETECTOR_NAMES = [
  190. "wind_power_curve", "wind_power_scatter",
  191. "yaw_static", "yaw_twist",
  192. "pitch_regulation", "pitch_coord", "pitch_min",
  193. "ctrl_power_quality", "ctrl_op_state",
  194. ]
  195. all_rows = []
  196. with ThreadPoolExecutor(max_workers=min(9, len(DETECTOR_NAMES))) as executor:
  197. futures = {
  198. executor.submit(
  199. _run_detector, mn, fn, tn,
  200. models.get(det_name), det_name,
  201. df_run, df_sensor, detect_time
  202. ): det_name
  203. for det_name in DETECTOR_NAMES
  204. }
  205. for future in as_completed(futures):
  206. det_name = futures[future]
  207. try:
  208. all_rows.extend(future.result())
  209. except Exception as e:
  210. print(f" [{det_name}] 并行检测异常: {e}")
  211. _bulk_insert(conn, all_rows)
  212. # ── 机型级检测 ─────────────────────────────────────────────────────────────────
  213. def detect_model_type(model_name: str, conn: sqlite3.Connection,
  214. target_date, detect_time: str):
  215. model_dir = MODEL_SAVE_DIR / model_name
  216. if not model_dir.exists():
  217. print(f"[SKIP] 机型 {model_name} 无已训练模型,请先运行 train.py")
  218. return
  219. turbines = list_turbines(model_name)
  220. print(f"\n机型 {model_name}: 共 {len(turbines)} 台风机,检测日期 {target_date}")
  221. # ── 加载 model_stats(优先从 pkl,回退到全量数据) ──
  222. stats_path = model_dir / "model_stats.pkl"
  223. if stats_path.exists():
  224. model_stats = joblib.load(stats_path)
  225. print(f" [stats] 从 model_stats.pkl 加载")
  226. else:
  227. from data_loader import load_model_type
  228. df_all = load_model_type(model_name, required_cols=["p_active"],
  229. optional_cols=["gen_spd", "actual_torque", "pitch_ang_act_1"])
  230. if df_all.empty:
  231. print(f"[SKIP] 机型 {model_name} 无数据")
  232. return
  233. model_stats = get_model_statistics(df_all)
  234. del df_all
  235. print(f" [stats] 从全量数据计算(建议重新训练以生成 model_stats.pkl)")
  236. # ── 机型级预加载所有模型(一次 joblib.load,所有风机复用) ──
  237. PKL_MAP = {
  238. "wind_power_curve": ("wind_power_curve.pkl", PowerCurveDetector.load),
  239. "wind_power_scatter": ("wind_power_scatter.pkl", ScatterDetector.load),
  240. "yaw_static": ("yaw_static.pkl", StaticYawDetector.load),
  241. "yaw_twist": ("yaw_twist.pkl", CableTwistDetector.load),
  242. "pitch_regulation": ("pitch_regulation.pkl", PitchRegulationDetector.load),
  243. "pitch_coord": ("pitch_coord.pkl", PitchCoordDetector.load),
  244. "pitch_min": ("pitch_min.pkl", MinPitchDetector.load),
  245. "ctrl_power_quality": ("ctrl_power_quality.pkl", PowerQualityDetector.load),
  246. "ctrl_op_state": ("ctrl_op_state.pkl", OperationStateDetector.load),
  247. }
  248. models = {}
  249. for det_name, (pkl_file, load_fn) in PKL_MAP.items():
  250. pkl_path = model_dir / pkl_file
  251. if pkl_path.exists():
  252. try:
  253. models[det_name] = load_fn(pkl_path)
  254. except Exception as e:
  255. print(f" [WARN] 加载 {pkl_file} 失败: {e}")
  256. print(f" [模型] 预加载 {len(models)}/{len(PKL_MAP)} 个检测器")
  257. for rec in turbines:
  258. print(f" 检测: {rec['farm_name']} / {rec['turbine_name']}")
  259. detect_turbine(rec, models, conn, model_stats, target_date, detect_time)
  260. # ── 主流程 ─────────────────────────────────────────────────────────────────────
  261. def main():
  262. parser = argparse.ArgumentParser(description="风机异常检测推理(逐点输出)")
  263. parser.add_argument("--model", type=str, default=None, help="指定机型名称")
  264. parser.add_argument("--date", type=str, default=None,
  265. help="指定检测日期 YYYY-MM-DD,默认为昨天")
  266. args = parser.parse_args()
  267. if args.date:
  268. target_date = datetime.strptime(args.date, "%Y-%m-%d").date()
  269. else:
  270. target_date = (datetime.now() - timedelta(days=1)).date()
  271. detect_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  272. conn = init_db(DB_PATH)
  273. print(f"结果数据库: {DB_PATH}")
  274. print(f"检测日期: {target_date}")
  275. if args.model:
  276. detect_model_type(args.model, conn, target_date, detect_time)
  277. else:
  278. for mt in list_model_types():
  279. detect_model_type(mt, conn, target_date, detect_time)
  280. conn.close()
  281. print("\n检测完成。")
  282. if __name__ == "__main__":
  283. main()