# 线程池类 import concurrent.futures from typing import List, Callable, Any, Tuple, Optional import logging from file_scanner import ParquetFileInfo logger = logging.getLogger(__name__) class ThreadPoolManager: """线程池管理器""" def __init__(self, max_workers: int = 20): self.max_workers = max_workers self.executor = None def process_files(self, file_infos: List[ParquetFileInfo], process_func: Callable[[ParquetFileInfo], Any]) -> List[Tuple[ParquetFileInfo, Any]]: """使用线程池处理文件""" results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: self.executor = executor # 提交所有任务 future_to_file = { executor.submit(process_func, file_info): file_info for file_info in file_infos } # 处理完成的任务 completed = 0 total = len(file_infos) for future in concurrent.futures.as_completed(future_to_file): file_info = future_to_file[future] completed += 1 try: result = future.result() results.append((file_info, result)) if isinstance(result, tuple) and len(result) == 3: total_rows, inserted_rows, updated_rows = result logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet " f"处理完成, 总行: {total_rows}, 插入: {inserted_rows}, 更新: {updated_rows}") else: logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理完成") except Exception as e: logger.error(f"处理文件 {file_info.file_path} 时出错: {e}") results.append((file_info, e)) return results def process_with_data_loader(self, file_infos: List[ParquetFileInfo], data_loader: Any) -> List[Tuple[ParquetFileInfo, Any]]: """使用DataLoader处理文件""" return self.process_files(file_infos, data_loader.load_file)