| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import os
- from pathlib import Path
- from typing import List, Optional
- import pandas as pd
- import pyarrow.parquet as pq
- from config import PARQUET_ROOT
- def list_model_types() -> List[str]:
- """返回 PARQUET_ROOT 下所有机型文件夹名称。"""
- return [p.name for p in PARQUET_ROOT.iterdir() if p.is_dir()]
- def list_turbines(model_name: str) -> List[dict]:
- """
- 返回某机型下所有风机信息列表。
- 每条记录: {model_name, farm_name, turbine_name, path}
- """
- records = []
- model_root = PARQUET_ROOT / model_name
- if not model_root.exists():
- return records
- for farm_dir in model_root.iterdir():
- if not farm_dir.is_dir():
- continue
- for pq_file in farm_dir.glob("*.parquet"):
- if pq_file.name.startswith("._"):
- continue
- records.append({
- "model_name": model_name,
- "farm_name": farm_dir.name,
- "turbine_name": pq_file.stem,
- "path": pq_file,
- })
- return records
- def load_turbine(
- path: Path,
- required_cols: List[str],
- optional_cols: Optional[List[str]] = None,
- ) -> Optional[pd.DataFrame]:
- """
- 读取单台风机 parquet 文件,带测点缺失容错。
- - required_cols: 任意一列缺失则跳过该文件,返回 None
- - optional_cols: 存在则读取,不存在则忽略
- - 返回仅包含实际存在列的 DataFrame,已对 required_cols 做 dropna
- """
- try:
- pq_file = pq.ParquetFile(path)
- schema_names = set(pq_file.schema.names)
- except Exception as e:
- print(f"[WARN] 无法读取 schema {path.name}: {e}")
- return None
- missing_required = [c for c in required_cols if c not in schema_names]
- if missing_required:
- print(f"[SKIP] {path.name} 缺少必要测点: {missing_required}")
- return None
- cols_to_read = list(required_cols)
- if optional_cols:
- cols_to_read += [c for c in optional_cols if c in schema_names]
- try:
- df = pq_file.read(columns=cols_to_read).to_pandas()
- except Exception as e:
- print(f"[WARN] 读取数据失败 {path.name}: {e}")
- return None
- # 转换数值类型,跳过时间戳列,过滤必要列的空值
- numeric_cols = [c for c in cols_to_read if c != "data_time"]
- for col in numeric_cols:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df = df.dropna(subset=required_cols)
- if df.empty:
- print(f"[SKIP] {path.name} 必要测点全为空值")
- return None
- return df
- def load_model_type(
- model_name: str,
- required_cols: List[str],
- optional_cols: Optional[List[str]] = None,
- ) -> pd.DataFrame:
- """
- 聚合某机型下所有风机数据为一个 DataFrame(用于训练)。
- 自动附加 farm_name / turbine_name 列便于溯源。
- """
- frames = []
- for rec in list_turbines(model_name):
- df = load_turbine(rec["path"], required_cols, optional_cols)
- if df is None:
- continue
- df = df.copy()
- df["farm_name"] = rec["farm_name"]
- df["turbine_name"] = rec["turbine_name"]
- frames.append(df)
- if not frames:
- print(f"[WARN] 机型 {model_name} 无有效数据(所需列: {required_cols})")
- return pd.DataFrame()
- result = pd.concat(frames, ignore_index=True)
- print(f"[INFO] 机型 {model_name} 加载完成,共 {len(result)} 行,{len(frames)} 台风机")
- return result
|