| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- # 线程池类
- import concurrent.futures
- from typing import List, Callable, Any, Tuple, Optional
- import logging
- from file_scanner import ParquetFileInfo
- logger = logging.getLogger(__name__)
- class ThreadPoolManager:
- """线程池管理器"""
-
- def __init__(self, max_workers: int = 20):
- self.max_workers = max_workers
- self.executor = None
-
- def process_files(self, file_infos: List[ParquetFileInfo],
- process_func: Callable[[ParquetFileInfo], Any]) -> List[Tuple[ParquetFileInfo, Any]]:
- """使用线程池处理文件"""
- results = []
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
- self.executor = executor
-
- # 提交所有任务
- future_to_file = {
- executor.submit(process_func, file_info): file_info
- for file_info in file_infos
- }
-
- # 处理完成的任务
- completed = 0
- total = len(file_infos)
-
- for future in concurrent.futures.as_completed(future_to_file):
- file_info = future_to_file[future]
- completed += 1
-
- try:
- result = future.result()
- results.append((file_info, result))
-
- if isinstance(result, tuple) and len(result) == 3:
- total_rows, inserted_rows, updated_rows = result
- logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet "
- f"处理完成, 总行: {total_rows}, 插入: {inserted_rows}, 更新: {updated_rows}")
- else:
- logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理完成")
-
- except Exception as e:
- logger.error(f"处理文件 {file_info.file_path} 时出错: {e}")
- results.append((file_info, e))
-
- return results
-
- def process_with_data_loader(self, file_infos: List[ParquetFileInfo],
- data_loader: Any) -> List[Tuple[ParquetFileInfo, Any]]:
- """使用DataLoader处理文件"""
- return self.process_files(file_infos, data_loader.load_file)
|