| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- # 文件扫描类
- import os
- import re
- from typing import List
- from dataclasses import dataclass
- @dataclass
- class ParquetFileInfo:
- """Parquet文件信息类"""
- file_path: str
- model_type: str # 机型型号
- farm_name: str # 风场名称
- farm_id: str # 风场编号
- turbine_id: str # 风机编号
- # 实际数据中的时间字段名(通过扫描识别)
- data_time_column: str = None
-
- class FileScanner:
- """文件扫描器,递归扫描parquet文件"""
-
- def __init__(self, base_path: str):
- self.base_path = base_path
- self.parquet_files: List[ParquetFileInfo] = []
-
- def scan_files(self) -> List[ParquetFileInfo]:
- """递归扫描所有parquet文件"""
- if not os.path.exists(self.base_path):
- raise FileNotFoundError(f"路径不存在: {self.base_path}")
-
- for root, dirs, files in os.walk(self.base_path):
- # 提取机型型号(第一层目录)
- rel_path = os.path.relpath(root, self.base_path)
- path_parts = rel_path.split(os.sep)
-
- if len(path_parts) >= 1 and path_parts[0] != '.':
- model_type = path_parts[0]
- else:
- continue
-
- if len(path_parts) >= 2:
- # 解析风场名称和编号
- farm_info = path_parts[1]
- farm_match = re.match(r'^(.*)_(.*)$', farm_info)
- if farm_match:
- farm_name = farm_match.group(1)
- farm_id = farm_match.group(2)
- else:
- farm_name = farm_info
- farm_id = ""
- else:
- farm_name = ""
- farm_id = ""
-
- # 扫描parquet文件
- for file in files:
- if file.endswith('.parquet'):
- # 提取风机编号(去掉扩展名)
- turbine_id = os.path.splitext(file)[0]
-
- file_info = ParquetFileInfo(
- file_path=os.path.join(root, file),
- model_type=model_type,
- farm_name=farm_name,
- farm_id=farm_id,
- turbine_id=turbine_id
- )
- self.parquet_files.append(file_info)
-
- return self.parquet_files
-
- def print_summary(self):
- """打印扫描结果摘要"""
- print(f"扫描完成!共找到 {len(self.parquet_files)} 个parquet文件")
- if self.parquet_files:
- print(f"机型型号种类: {len(set(f.model_type for f in self.parquet_files))}")
- print(f"风场数量: {len(set(f.farm_id for f in self.parquet_files))}")
|