import concurrent.futures from typing import List, Callable, Any, Tuple, Optional, Set import logging import os import json import threading from datetime import datetime from file_scanner import ParquetFileInfo logger = logging.getLogger(__name__) class ProcessedRecordManager: """已处理文件记录管理器(线程安全)""" def __init__(self, record_file: str = "record_processed.json"): """ 初始化记录管理器 Args: record_file: 记录文件路径 """ self.record_file = record_file self.processed_files: Set[str] = set() self._lock = threading.Lock() # 线程锁,确保线程安全 self._load_records() def _load_records(self): """加载已处理文件记录""" try: with self._lock: if os.path.exists(self.record_file): with open(self.record_file, 'r', encoding='utf-8') as f: records = json.load(f) if isinstance(records, list): self.processed_files = set(records) logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录") else: logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}") except Exception as e: logger.warning(f"❌ 加载记录文件失败: {e}") self.processed_files = set() def is_processed(self, file_path: str) -> bool: """检查文件是否已处理过""" with self._lock: return file_path in self.processed_files def add_record(self, file_path: str, metadata: dict = None): """添加处理记录并立即保存(线程安全)""" with self._lock: if file_path not in self.processed_files: self.processed_files.add(file_path) self._save_records_internal(file_path, metadata) def _save_records_internal(self, file_path: str = None, metadata: dict = None): """内部方法:保存记录到文件(线程安全)""" try: # 转换为列表并排序,便于阅读 records_list = sorted(list(self.processed_files)) # 准备要保存的数据 save_data = records_list with open(self.record_file, 'w', encoding='utf-8') as f: json.dump(save_data, f, ensure_ascii=False, indent=2) logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}") return True except Exception as e: logger.error(f"❌ 保存记录文件失败: {e}") return False def get_record_count(self) -> int: """获取记录数量""" with self._lock: return len(self.processed_files) def clear_records(self): """清空记录""" with self._lock: self.processed_files.clear() try: if os.path.exists(self.record_file): os.remove(self.record_file) logger.info(f"🗑️ 已删除记录文件: {self.record_file}") except Exception as e: logger.warning(f"删除记录文件失败: {e}") class ThreadPoolManager: """线程池管理器,集成记录管理功能""" def __init__(self, max_workers: int = 20, record_file: str = "record_processed.json"): self.max_workers = max_workers self.executor = None self.record_manager = ProcessedRecordManager(record_file) def process_files(self, file_infos: List[ParquetFileInfo], process_func: Callable[[ParquetFileInfo], Any]) -> List[Tuple[ParquetFileInfo, Any]]: """ 使用线程池处理文件,自动记录成功处理的文件 Args: file_infos: 文件信息列表 process_func: 处理函数 Returns: 处理结果列表 """ # 先过滤掉已处理过的文件 unprocessed_files = [] for file_info in file_infos: if self.record_manager.is_processed(file_info.file_path): logger.debug(f"⏭️ 跳过已处理文件: {os.path.basename(file_info.file_path)}") else: unprocessed_files.append(file_info) if not unprocessed_files: logger.info("🎉 所有文件都已处理过,无需处理新文件") return [] logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件(跳过了 {len(file_infos)-len(unprocessed_files)} 个已处理文件)") results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: self.executor = executor # 提交所有任务 future_to_file = { executor.submit(process_func, file_info): file_info for file_info in unprocessed_files } # 处理完成的任务 completed = 0 total = len(unprocessed_files) for future in concurrent.futures.as_completed(future_to_file): file_info = future_to_file[future] completed += 1 try: result = future.result() if isinstance(result, tuple) and len(result) == 3: # 成功处理,记录文件 total_rows, inserted_rows, updated_rows = result results.append((file_info, result)) # ✅ 立即记录成功处理的文件 self.record_manager.add_record(file_info.file_path, { 'total_rows': total_rows, 'inserted_rows': inserted_rows, 'updated_rows': updated_rows, 'processed_time': datetime.now().isoformat() }) logger.info(f"✅ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet " f"处理完成, 总行: {total_rows}, 插入: {inserted_rows}, 更新: {updated_rows}") else: # 处理失败,不记录 results.append((file_info, result)) logger.error(f"❌ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理失败") except Exception as e: # 处理异常,不记录 logger.error(f"❌ 处理文件 {file_info.file_path} 时出错: {e}") results.append((file_info, e)) return results def process_with_data_loader(self, file_infos: List[ParquetFileInfo], data_loader: Any) -> List[Tuple[ParquetFileInfo, Any]]: """ 使用DataLoader处理文件,自动记录成功处理的文件 Args: file_infos: 文件信息列表 data_loader: DataLoader实例 Returns: 处理结果列表 """ return self.process_files(file_infos, data_loader.load_file) def get_record_count(self) -> int: """获取已处理文件数量""" return self.record_manager.get_record_count() def get_unprocessed_count(self, all_files: List[ParquetFileInfo]) -> int: """获取未处理文件数量""" unprocessed = 0 for file_info in all_files: if not self.record_manager.is_processed(file_info.file_path): unprocessed += 1 return unprocessed