import multiprocessing import os from typing import Dict, List, Tuple, Optional import pandas as pd from conf.constants import DataProcessing, ParallelProcessing from etl.common.PathsAndTable import PathsAndTable from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new from utils.log.trans_log import info, debug from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class CombineAndSaveFormalFile: """合并并保存正式文件""" # 常量定义 TIME_STAMP_COLUMN = DataProcessing.TIME_STAMP_COLUMN def __init__(self, paths_and_table: PathsAndTable): """ 初始化合并器 Args: paths_and_table: 路径和表信息对象 """ self.paths_and_table = paths_and_table self.updated_files = multiprocessing.Manager().list() def _merge_dataframes(self, exists_df: pd.DataFrame, now_df: pd.DataFrame) -> pd.DataFrame: """ 合并两个数据框并去重排序 Args: exists_df: 已存在的数据框 now_df: 当前的数据框 Returns: 合并后的数据框 """ combined_df = pd.concat([exists_df, now_df]) # 去重,保留最新的数据 combined_df = combined_df.drop_duplicates( subset=self.TIME_STAMP_COLUMN, keep='last' ) # 按时间戳排序 return combined_df.sort_values( by=self.TIME_STAMP_COLUMN ).reset_index(drop=True) def _save_combined_file(self, file_path: str, key: Tuple[str, str], exists_file_path: Optional[str]) -> None: """ 保存合并后的文件 Args: file_path: 新文件路径 key: 文件键值 (目录名, 文件名) exists_file_path: 已存在的文件路径,如果为None则表示不存在 """ has_exists = exists_file_path is not None if has_exists: # 合并并保存 exists_df = read_file_to_df(exists_file_path) now_df = read_file_to_df(file_path) combined_df = self._merge_dataframes(exists_df, now_df) combined_df.to_csv(exists_file_path, encoding='utf-8', index=False) self.updated_files.append(exists_file_path) else: # 复制新文件 save_dir = str(os.path.join( self.paths_and_table.get_save_path(), key[0], key[1] )) copy_to_new(file_path, save_dir) self.updated_files.append(save_dir) # 记录日志 status = "包含" if has_exists else "不包含" debug(f"{key[0]}/{key[1]} {status} 相同文件,保存成功") def _build_file_maps(self, base_path: str) -> Dict[Tuple[str, str], str]: """ 构建文件映射字典 Args: base_path: 基础路径 Returns: 文件路径映射字典,键为(目录名, 文件名),值为完整路径 """ files = read_excel_files(base_path) return { (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path)): file_path for file_path in files } def combine_and_save_formal_file(self) -> None: """合并并保存正式文件的主方法""" # 构建已存在文件和新文件的映射 exists_file_maps = self._build_file_maps(self.paths_and_table.get_save_path()) new_file_maps = self._build_file_maps(self.paths_and_table.get_tmp_formal_path()) # 找出相同键的文件 same_keys = set(exists_file_maps.keys()) & set(new_file_maps.keys()) # 准备并行处理参数 process_args = [ ( file_path, key, exists_file_maps.get(key) if key in same_keys else None ) for key, file_path in new_file_maps.items() ] # 使用并行处理 cpu_count = get_available_cpu_count_with_percent(ParallelProcessing.CPU_USAGE_PERCENT) cpu_count = min(cpu_count, ParallelProcessing.MAX_PROCESSES) with multiprocessing.Pool(cpu_count) as pool: pool.starmap(self._save_combined_file, process_args) def run(self) -> List[str]: """ 执行合并操作 Returns: 更新后的文件路径列表 """ self.combine_and_save_formal_file() info(f"共处理了 {len(self.updated_files)} 个文件") return list(self.updated_files)