data_loader.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # 数据加载类
  2. import pandas as pd
  3. from typing import Optional, Tuple
  4. import time
  5. import traceback
  6. import logging
  7. from database import DatabaseManager
  8. from file_scanner import ParquetFileInfo
  9. logger = logging.getLogger(__name__)
  10. class DataLoader:
  11. """数据加载器,处理单个parquet文件,支持三字段唯一键UPSERT"""
  12. def __init__(self, db_manager: DatabaseManager, table_name: str,
  13. batch_size: int = 1000, max_retries: int = 3, upsert: bool = True):
  14. self.db_manager = db_manager
  15. self.table_name = table_name
  16. self.batch_size = batch_size
  17. self.max_retries = max_retries
  18. self.upsert = upsert
  19. def load_file(self, file_info: ParquetFileInfo) -> Optional[Tuple[int, int, int]]:
  20. """
  21. 加载单个parquet文件到数据库
  22. Args:
  23. file_info: 文件信息(包含时间字段名)
  24. Returns:
  25. (总行数, 插入行数, 更新行数),失败返回None
  26. """
  27. retry_count = 0
  28. while retry_count <= self.max_retries:
  29. try:
  30. if self.upsert:
  31. # 使用UPSERT方式,三字段唯一键
  32. total_rows, inserted_rows, updated_rows = self.db_manager.upsert_parquet_data(
  33. file_info=file_info,
  34. table_name=self.table_name,
  35. batch_size=self.batch_size,
  36. max_retries=self.max_retries
  37. )
  38. return total_rows, inserted_rows, updated_rows
  39. else:
  40. raise NotImplementedError("INSERT模式暂不支持")
  41. except Exception as e:
  42. retry_count += 1
  43. if retry_count > self.max_retries:
  44. logger.error(f"加载文件 {file_info.file_path} 失败,已达到最大重试次数: {e}")
  45. logger.error(traceback.format_exc())
  46. return None
  47. else:
  48. logger.warning(f"加载文件 {file_info.file_path} 失败,第 {retry_count} 次重试: {e}")
  49. time.sleep(2 * retry_count)
  50. return None