| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- # 数据加载类
- import pandas as pd
- from typing import Optional, Tuple
- import time
- import traceback
- import logging
- from database import DatabaseManager
- from file_scanner import ParquetFileInfo
- logger = logging.getLogger(__name__)
- class DataLoader:
- """数据加载器,处理单个parquet文件,支持三字段唯一键UPSERT"""
-
- def __init__(self, db_manager: DatabaseManager, table_name: str,
- batch_size: int = 1000, max_retries: int = 3, upsert: bool = True):
- self.db_manager = db_manager
- self.table_name = table_name
- self.batch_size = batch_size
- self.max_retries = max_retries
- self.upsert = upsert
-
- def load_file(self, file_info: ParquetFileInfo) -> Optional[Tuple[int, int, int]]:
- """
- 加载单个parquet文件到数据库
-
- Args:
- file_info: 文件信息(包含时间字段名)
-
- Returns:
- (总行数, 插入行数, 更新行数),失败返回None
- """
- retry_count = 0
-
- while retry_count <= self.max_retries:
- try:
- if self.upsert:
- # 使用UPSERT方式,三字段唯一键
- total_rows, inserted_rows, updated_rows = self.db_manager.upsert_parquet_data(
- file_info=file_info,
- table_name=self.table_name,
- batch_size=self.batch_size,
- max_retries=self.max_retries
- )
- return total_rows, inserted_rows, updated_rows
- else:
- raise NotImplementedError("INSERT模式暂不支持")
-
- except Exception as e:
- retry_count += 1
- if retry_count > self.max_retries:
- logger.error(f"加载文件 {file_info.file_path} 失败,已达到最大重试次数: {e}")
- logger.error(traceback.format_exc())
- return None
- else:
- logger.warning(f"加载文件 {file_info.file_path} 失败,第 {retry_count} 次重试: {e}")
- time.sleep(2 * retry_count)
-
- return None
|