| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import concurrent.futures
- from typing import List, Callable, Any, Tuple, Optional, Set
- import logging
- import os
- import json
- import threading
- from datetime import datetime
- from file_scanner import ParquetFileInfo
- logger = logging.getLogger(__name__)
- class ProcessedRecordManager:
- """已处理文件记录管理器(线程安全)"""
-
- def __init__(self, record_file: str = "record_processed.json"):
- """
- 初始化记录管理器
-
- Args:
- record_file: 记录文件路径
- """
- self.record_file = record_file
- self.processed_files: Set[str] = set()
- self._lock = threading.Lock() # 线程锁,确保线程安全
- self._load_records()
-
- def _load_records(self):
- """加载已处理文件记录"""
- try:
- with self._lock:
- if os.path.exists(self.record_file):
- with open(self.record_file, 'r', encoding='utf-8') as f:
- records = json.load(f)
- if isinstance(records, list):
- self.processed_files = set(records)
- logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录")
- else:
- logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}")
- except Exception as e:
- logger.warning(f"❌ 加载记录文件失败: {e}")
- self.processed_files = set()
-
- def is_processed(self, file_path: str) -> bool:
- """检查文件是否已处理过"""
- with self._lock:
- return file_path in self.processed_files
-
- def add_record(self, file_path: str, metadata: dict = None):
- """添加处理记录并立即保存(线程安全)"""
- with self._lock:
- if file_path not in self.processed_files:
- self.processed_files.add(file_path)
- self._save_records_internal(file_path, metadata)
-
- def _save_records_internal(self, file_path: str = None, metadata: dict = None):
- """内部方法:保存记录到文件(线程安全)"""
- try:
- # 转换为列表并排序,便于阅读
- records_list = sorted(list(self.processed_files))
-
- # 准备要保存的数据
- save_data = records_list
-
- with open(self.record_file, 'w', encoding='utf-8') as f:
- json.dump(save_data, f, ensure_ascii=False, indent=2)
-
- logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}")
- return True
- except Exception as e:
- logger.error(f"❌ 保存记录文件失败: {e}")
- return False
-
- def get_record_count(self) -> int:
- """获取记录数量"""
- with self._lock:
- return len(self.processed_files)
-
- def clear_records(self):
- """清空记录"""
- with self._lock:
- self.processed_files.clear()
- try:
- if os.path.exists(self.record_file):
- os.remove(self.record_file)
- logger.info(f"🗑️ 已删除记录文件: {self.record_file}")
- except Exception as e:
- logger.warning(f"删除记录文件失败: {e}")
- class ThreadPoolManager:
- """线程池管理器,集成记录管理功能"""
-
- def __init__(self, max_workers: int = 20, record_file: str = "record_processed.json"):
- self.max_workers = max_workers
- self.executor = None
- self.record_manager = ProcessedRecordManager(record_file)
-
- def process_files(self, file_infos: List[ParquetFileInfo],
- process_func: Callable[[ParquetFileInfo], Any]) -> List[Tuple[ParquetFileInfo, Any]]:
- """
- 使用线程池处理文件,自动记录成功处理的文件
-
- Args:
- file_infos: 文件信息列表
- process_func: 处理函数
-
- Returns:
- 处理结果列表
- """
- # 先过滤掉已处理过的文件
- unprocessed_files = []
- for file_info in file_infos:
- if self.record_manager.is_processed(file_info.file_path):
- logger.debug(f"⏭️ 跳过已处理文件: {os.path.basename(file_info.file_path)}")
- else:
- unprocessed_files.append(file_info)
-
- if not unprocessed_files:
- logger.info("🎉 所有文件都已处理过,无需处理新文件")
- return []
-
- logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件(跳过了 {len(file_infos)-len(unprocessed_files)} 个已处理文件)")
-
- results = []
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
- self.executor = executor
-
- # 提交所有任务
- future_to_file = {
- executor.submit(process_func, file_info): file_info
- for file_info in unprocessed_files
- }
-
- # 处理完成的任务
- completed = 0
- total = len(unprocessed_files)
-
- for future in concurrent.futures.as_completed(future_to_file):
- file_info = future_to_file[future]
- completed += 1
-
- try:
- result = future.result()
-
- if isinstance(result, tuple) and len(result) == 3:
- # 成功处理,记录文件
- total_rows, inserted_rows, updated_rows = result
- results.append((file_info, result))
-
- # ✅ 立即记录成功处理的文件
- self.record_manager.add_record(file_info.file_path, {
- 'total_rows': total_rows,
- 'inserted_rows': inserted_rows,
- 'updated_rows': updated_rows,
- 'processed_time': datetime.now().isoformat()
- })
-
- logger.info(f"✅ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet "
- f"处理完成, 总行: {total_rows}, 插入: {inserted_rows}, 更新: {updated_rows}")
- else:
- # 处理失败,不记录
- results.append((file_info, result))
- logger.error(f"❌ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理失败")
-
- except Exception as e:
- # 处理异常,不记录
- logger.error(f"❌ 处理文件 {file_info.file_path} 时出错: {e}")
- results.append((file_info, e))
-
- return results
-
- def process_with_data_loader(self, file_infos: List[ParquetFileInfo],
- data_loader: Any) -> List[Tuple[ParquetFileInfo, Any]]:
- """
- 使用DataLoader处理文件,自动记录成功处理的文件
-
- Args:
- file_infos: 文件信息列表
- data_loader: DataLoader实例
-
- Returns:
- 处理结果列表
- """
- return self.process_files(file_infos, data_loader.load_file)
-
- def get_record_count(self) -> int:
- """获取已处理文件数量"""
- return self.record_manager.get_record_count()
-
- def get_unprocessed_count(self, all_files: List[ParquetFileInfo]) -> int:
- """获取未处理文件数量"""
- unprocessed = 0
- for file_info in all_files:
- if not self.record_manager.is_processed(file_info.file_path):
- unprocessed += 1
- return unprocessed
|