# 数据加载类 import pandas as pd from typing import Optional, Tuple import time import traceback import logging from database import DatabaseManager from file_scanner import ParquetFileInfo logger = logging.getLogger(__name__) class DataLoader: """数据加载器,处理单个parquet文件,支持三字段唯一键UPSERT""" def __init__(self, db_manager: DatabaseManager, table_name: str, batch_size: int = 1000, max_retries: int = 3, upsert: bool = True): self.db_manager = db_manager self.table_name = table_name self.batch_size = batch_size self.max_retries = max_retries self.upsert = upsert def load_file(self, file_info: ParquetFileInfo) -> Optional[Tuple[int, int, int]]: """ 加载单个parquet文件到数据库 Args: file_info: 文件信息(包含时间字段名) Returns: (总行数, 插入行数, 更新行数),失败返回None """ retry_count = 0 while retry_count <= self.max_retries: try: if self.upsert: # 使用UPSERT方式,三字段唯一键 total_rows, inserted_rows, updated_rows = self.db_manager.upsert_parquet_data( file_info=file_info, table_name=self.table_name, batch_size=self.batch_size, max_retries=self.max_retries ) return total_rows, inserted_rows, updated_rows else: raise NotImplementedError("INSERT模式暂不支持") except Exception as e: retry_count += 1 if retry_count > self.max_retries: logger.error(f"加载文件 {file_info.file_path} 失败,已达到最大重试次数: {e}") logger.error(traceback.format_exc()) return None else: logger.warning(f"加载文件 {file_info.file_path} 失败,第 {retry_count} 次重试: {e}") time.sleep(2 * retry_count) return None