import os from pathlib import Path from typing import List, Optional import pandas as pd import pyarrow.parquet as pq from config import PARQUET_ROOT def list_model_types() -> List[str]: """返回 PARQUET_ROOT 下所有机型文件夹名称。""" return [p.name for p in PARQUET_ROOT.iterdir() if p.is_dir()] def list_turbines(model_name: str) -> List[dict]: """ 返回某机型下所有风机信息列表。 每条记录: {model_name, farm_name, turbine_name, path} """ records = [] model_root = PARQUET_ROOT / model_name if not model_root.exists(): return records for farm_dir in model_root.iterdir(): if not farm_dir.is_dir(): continue for pq_file in farm_dir.glob("*.parquet"): if pq_file.name.startswith("._"): continue records.append({ "model_name": model_name, "farm_name": farm_dir.name, "turbine_name": pq_file.stem, "path": pq_file, }) return records def load_turbine( path: Path, required_cols: List[str], optional_cols: Optional[List[str]] = None, ) -> Optional[pd.DataFrame]: """ 读取单台风机 parquet 文件,带测点缺失容错。 - required_cols: 任意一列缺失则跳过该文件,返回 None - optional_cols: 存在则读取,不存在则忽略 - 返回仅包含实际存在列的 DataFrame,已对 required_cols 做 dropna """ try: pq_file = pq.ParquetFile(path) schema_names = set(pq_file.schema.names) except Exception as e: print(f"[WARN] 无法读取 schema {path.name}: {e}") return None missing_required = [c for c in required_cols if c not in schema_names] if missing_required: print(f"[SKIP] {path.name} 缺少必要测点: {missing_required}") return None cols_to_read = list(required_cols) if optional_cols: cols_to_read += [c for c in optional_cols if c in schema_names] try: df = pq_file.read(columns=cols_to_read).to_pandas() except Exception as e: print(f"[WARN] 读取数据失败 {path.name}: {e}") return None # 转换数值类型,跳过时间戳列,过滤必要列的空值 numeric_cols = [c for c in cols_to_read if c != "data_time"] for col in numeric_cols: df[col] = pd.to_numeric(df[col], errors='coerce') df = df.dropna(subset=required_cols) if df.empty: print(f"[SKIP] {path.name} 必要测点全为空值") return None return df def load_model_type( model_name: str, required_cols: List[str], optional_cols: Optional[List[str]] = None, ) -> pd.DataFrame: """ 聚合某机型下所有风机数据为一个 DataFrame(用于训练)。 自动附加 farm_name / turbine_name 列便于溯源。 """ frames = [] for rec in list_turbines(model_name): df = load_turbine(rec["path"], required_cols, optional_cols) if df is None: continue df = df.copy() df["farm_name"] = rec["farm_name"] df["turbine_name"] = rec["turbine_name"] frames.append(df) if not frames: print(f"[WARN] 机型 {model_name} 无有效数据(所需列: {required_cols})") return pd.DataFrame() result = pd.concat(frames, ignore_index=True) print(f"[INFO] 机型 {model_name} 加载完成,共 {len(result)} 行,{len(frames)} 台风机") return result