file_scanner.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # 文件扫描类
  2. import os
  3. import re
  4. from typing import List
  5. from dataclasses import dataclass
  6. @dataclass
  7. class ParquetFileInfo:
  8. """Parquet文件信息类"""
  9. file_path: str
  10. model_type: str # 机型型号
  11. farm_name: str # 风场名称
  12. farm_id: str # 风场编号
  13. turbine_id: str # 风机编号
  14. # 实际数据中的时间字段名(通过扫描识别)
  15. data_time_column: str = None
  16. class FileScanner:
  17. """文件扫描器,递归扫描parquet文件"""
  18. def __init__(self, base_path: str):
  19. self.base_path = base_path
  20. self.parquet_files: List[ParquetFileInfo] = []
  21. def scan_files(self) -> List[ParquetFileInfo]:
  22. """递归扫描所有parquet文件"""
  23. if not os.path.exists(self.base_path):
  24. raise FileNotFoundError(f"路径不存在: {self.base_path}")
  25. for root, dirs, files in os.walk(self.base_path):
  26. # 提取机型型号(第一层目录)
  27. rel_path = os.path.relpath(root, self.base_path)
  28. path_parts = rel_path.split(os.sep)
  29. if len(path_parts) >= 1 and path_parts[0] != '.':
  30. model_type = path_parts[0]
  31. else:
  32. continue
  33. if len(path_parts) >= 2:
  34. # 解析风场名称和编号
  35. farm_info = path_parts[1]
  36. farm_match = re.match(r'^(.*)_(.*)$', farm_info)
  37. if farm_match:
  38. farm_name = farm_match.group(1)
  39. farm_id = farm_match.group(2)
  40. else:
  41. farm_name = farm_info
  42. farm_id = ""
  43. else:
  44. farm_name = ""
  45. farm_id = ""
  46. # 扫描parquet文件
  47. for file in files:
  48. if file.endswith('.parquet'):
  49. # 提取风机编号(去掉扩展名)
  50. turbine_id = os.path.splitext(file)[0]
  51. file_info = ParquetFileInfo(
  52. file_path=os.path.join(root, file),
  53. model_type=model_type,
  54. farm_name=farm_name,
  55. farm_id=farm_id,
  56. turbine_id=turbine_id
  57. )
  58. self.parquet_files.append(file_info)
  59. return self.parquet_files
  60. def print_summary(self):
  61. """打印扫描结果摘要"""
  62. print(f"扫描完成!共找到 {len(self.parquet_files)} 个parquet文件")
  63. if self.parquet_files:
  64. print(f"机型型号种类: {len(set(f.model_type for f in self.parquet_files))}")
  65. print(f"风场数量: {len(set(f.farm_id for f in self.parquet_files))}")