# 文件扫描类 import os import re from typing import List from dataclasses import dataclass @dataclass class ParquetFileInfo: """Parquet文件信息类""" file_path: str model_type: str # 机型型号 farm_name: str # 风场名称 farm_id: str # 风场编号 turbine_id: str # 风机编号 # 实际数据中的时间字段名(通过扫描识别) data_time_column: str = None class FileScanner: """文件扫描器,递归扫描parquet文件""" def __init__(self, base_path: str): self.base_path = base_path self.parquet_files: List[ParquetFileInfo] = [] def scan_files(self) -> List[ParquetFileInfo]: """递归扫描所有parquet文件""" if not os.path.exists(self.base_path): raise FileNotFoundError(f"路径不存在: {self.base_path}") for root, dirs, files in os.walk(self.base_path): # 提取机型型号(第一层目录) rel_path = os.path.relpath(root, self.base_path) path_parts = rel_path.split(os.sep) if len(path_parts) >= 1 and path_parts[0] != '.': model_type = path_parts[0] else: continue if len(path_parts) >= 2: # 解析风场名称和编号 farm_info = path_parts[1] farm_match = re.match(r'^(.*)_(.*)$', farm_info) if farm_match: farm_name = farm_match.group(1) farm_id = farm_match.group(2) else: farm_name = farm_info farm_id = "" else: farm_name = "" farm_id = "" # 扫描parquet文件 for file in files: if file.endswith('.parquet'): # 提取风机编号(去掉扩展名) turbine_id = os.path.splitext(file)[0] file_info = ParquetFileInfo( file_path=os.path.join(root, file), model_type=model_type, farm_name=farm_name, farm_id=farm_id, turbine_id=turbine_id ) self.parquet_files.append(file_info) return self.parquet_files def print_summary(self): """打印扫描结果摘要""" print(f"扫描完成!共找到 {len(self.parquet_files)} 个parquet文件") if self.parquet_files: print(f"机型型号种类: {len(set(f.model_type for f in self.parquet_files))}") print(f"风场数量: {len(set(f.farm_id for f in self.parquet_files))}")