""" 状态打标模块 功能: 1. get_model_statistics() - 按机型统计额定功率、基准桨距角等自适应阈值 2. label_dataframe() - 对 DataFrame 打标:运行 / 限功率 / 停机 / 传感器异常-xxx异常 3. DETECTOR_SENSOR_COLS - 每个检测器所依赖的测点,用于 detect.py 判断传感器异常 状态优先级(高→低): 传感器异常 > 停机 > 限功率 > 运行 传感器异常列命名规则: d_val_power 功率值越界 d_val_wind 风速值越界 d_val_pitch 变桨值越界 d_val_spd 转速值越界 d_val_torque 扭矩值越界 d_logic_wind_pwr 风速-功率逻辑悖论 d_logic_torque_pwr 转速-扭矩逻辑悖论 """ import pandas as pd import numpy as np from typing import Dict, Optional from config import ( STATUS_POWER_UPPER_RATIO, STATUS_POWER_LOWER_RATIO, STATUS_SHUTDOWN_RATIO, STATUS_CURTAIL_LOW_RATIO, STATUS_CURTAIL_HIGH_RATIO, STATUS_CURTAIL_PITCH_OFFSET, STATUS_WIND_MAX, STATUS_WIND_MIN, STATUS_PITCH_MAX, STATUS_PITCH_MIN, STATUS_SPD_UPPER_RATIO, STATUS_TORQUE_UPPER_RATIO, STATUS_TORQUE_LOWER_ABS, STATUS_LOGIC_WIND_MIN, STATUS_LOGIC_POWER_MIN, STATUS_MODEL_SPECIAL_RULES, ) # ── 每个检测器依赖的传感器异常列 ────────────────────────────────────────────── # detect.py 用此映射判断:传感器异常数据中,该检测器所需测点是否有异常 # 若有 → 直接输出传感器异常标签;若无 → 正常做异常检测 DETECTOR_SENSOR_COLS: Dict[str, list] = { "wind_power_curve": ["d_val_wind", "d_val_power", "d_logic_wind_pwr"], "wind_power_scatter":["d_val_wind", "d_val_power", "d_logic_wind_pwr"], "yaw_static": [], # yaw_ang 无对应传感器异常列,不直接跳过 "yaw_twist": [], "pitch_regulation": ["d_val_pitch"], "pitch_coord": ["d_val_pitch", "d_val_spd", "d_val_power"], "pitch_min": ["d_val_pitch"], "ctrl_power_quality": ["d_val_power"], "ctrl_op_state": ["d_val_power", "d_val_spd", "d_val_pitch"], } # 传感器异常列 → 可读标签(用于输出结果) SENSOR_COL_LABEL: Dict[str, str] = { "d_val_power": "功率值异常", "d_val_wind": "风速值异常", "d_val_pitch": "变桨值异常", "d_val_spd": "转速值异常", "d_val_torque": "扭矩值异常", "d_logic_wind_pwr": "风速功率逻辑异常", "d_logic_torque_pwr": "转速扭矩逻辑异常", } def get_model_statistics(df: pd.DataFrame) -> dict: """计算机型自适应统计阈值(需传入机型全量数据)。""" stats: dict = {} stats["p_max_observed"] = df["p_active"].quantile(0.995) stats["torque_limit"] = ( df["actual_torque"].quantile(0.999) if "actual_torque" in df.columns else None ) stats["spd_limit"] = ( df["gen_spd"].quantile(0.999) if "gen_spd" in df.columns else None ) partial_mask = ( (df["p_active"] > stats["p_max_observed"] * 0.2) & (df["p_active"] < stats["p_max_observed"] * 0.6) ) if partial_mask.any() and "pitch_ang_act_1" in df.columns: stats["baseline_pitch"] = df.loc[partial_mask, "pitch_ang_act_1"].median() else: stats["baseline_pitch"] = 0.0 print( f" [自适应统计] 额定: {stats['p_max_observed']:.1f}kW" f" | 基准桨距: {stats['baseline_pitch']:.2f}°" ) return stats def label_dataframe(df_input: pd.DataFrame, stats: dict, model_name: str) -> pd.DataFrame: """ 对 DataFrame 打标,返回含以下新列的 DataFrame: - d_val_* / d_logic_* : 各传感器异常布尔列 - sensor_anomaly_tags : 逗号拼接的传感器异常标签字符串(无异常为空串) - status : 运行 / 限功率 / 停机 / 传感器异常-xxx异常 """ df = df_input.copy() P_MAX = stats["p_max_observed"] PITCH_BASE = stats["baseline_pitch"] # ── 1. 传感器异常列 ──────────────────────────────────────────────────────── df["d_val_power"] = ( (df["p_active"] > P_MAX * STATUS_POWER_UPPER_RATIO) | (df["p_active"] < P_MAX * STATUS_POWER_LOWER_RATIO) ) if "wind_spd" in df.columns: df["d_val_wind"] = (df["wind_spd"] > STATUS_WIND_MAX) | (df["wind_spd"] < STATUS_WIND_MIN) else: df["d_val_wind"] = False pitch_cols = [c for c in df.columns if "pitch_ang_act" in c] df["d_val_pitch"] = False for col in pitch_cols: df["d_val_pitch"] |= (df[col] > STATUS_PITCH_MAX) | (df[col] < STATUS_PITCH_MIN) df["d_val_spd"] = False if stats["spd_limit"] and "gen_spd" in df.columns: df["d_val_spd"] = ( (df["gen_spd"] > stats["spd_limit"] * STATUS_SPD_UPPER_RATIO) | (df["gen_spd"] < -200) ) df["d_val_torque"] = False if stats["torque_limit"] and "actual_torque" in df.columns: df["d_val_torque"] = ( (df["actual_torque"] > stats["torque_limit"] * STATUS_TORQUE_UPPER_RATIO) | (df["actual_torque"] < STATUS_TORQUE_LOWER_ABS) ) # ── 2. 逻辑悖论列 ────────────────────────────────────────────────────────── if "wind_spd" in df.columns: df["d_logic_wind_pwr"] = ( (df["wind_spd"] < STATUS_LOGIC_WIND_MIN) & (df["p_active"] > STATUS_LOGIC_POWER_MIN) ) for wind_thresh, pwr_thresh in STATUS_MODEL_SPECIAL_RULES.get(model_name, []): df["d_logic_wind_pwr"] |= ( (df["wind_spd"] < wind_thresh) & (df["p_active"] > pwr_thresh) ) else: df["d_logic_wind_pwr"] = False df["d_logic_torque_pwr"] = False if stats["torque_limit"] and "actual_torque" in df.columns: df["d_logic_torque_pwr"] = ( (df["p_active"] > P_MAX * 0.1) & (df["actual_torque"].abs() < stats["torque_limit"] * 0.01) ) # ── 3. 汇总传感器异常标签 ────────────────────────────────────────────────── all_sensor_cols = list(SENSOR_COL_LABEL.keys()) existing = [c for c in all_sensor_cols if c in df.columns] # 向量化拼接:逐列生成标签,再按行合并(避免 apply axis=1) if existing: tag_matrix = np.where( df[existing].values, np.array([SENSOR_COL_LABEL[c] for c in existing]), "", ) df["sensor_anomaly_tags"] = pd.Series( [",".join(t for t in row if t) for row in tag_matrix], index=df.index, ) else: df["sensor_anomaly_tags"] = "" # ── 4. 业务状态打标 ──────────────────────────────────────────────────────── any_sensor_anomaly = df[[c for c in existing if c in df.columns]].any(axis=1) df["status"] = "运行" shutdown_thresh = max(10.0, P_MAX * STATUS_SHUTDOWN_RATIO) mask_shutdown = (~df["d_val_power"]) & (df["p_active"] <= shutdown_thresh) df.loc[mask_shutdown, "status"] = "停机" if "pitch_ang_act_1" in df.columns: mask_curtail = ( (df["status"] != "停机") & (~df["d_val_power"]) & (~df["d_val_pitch"]) & (df["p_active"] > P_MAX * STATUS_CURTAIL_LOW_RATIO) & (df["p_active"] < P_MAX * STATUS_CURTAIL_HIGH_RATIO) & (df["pitch_ang_act_1"] > (PITCH_BASE + STATUS_CURTAIL_PITCH_OFFSET)) ) df.loc[mask_curtail, "status"] = "限功率" # 传感器异常覆盖(最高优先级) mask_sensor = any_sensor_anomaly df.loc[mask_sensor, "status"] = ( "传感器异常-" + df.loc[mask_sensor, "sensor_anomaly_tags"] ) return df