| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import multiprocessing
- import os
- from typing import Dict, List, Tuple, Optional
- import pandas as pd
- from conf.constants import DataProcessing, ParallelProcessing
- from etl.common.PathsAndTable import PathsAndTable
- from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new
- from utils.log.trans_log import info, debug
- from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
- class CombineAndSaveFormalFile:
- """合并并保存正式文件"""
- # 常量定义
- TIME_STAMP_COLUMN = DataProcessing.TIME_STAMP_COLUMN
- def __init__(self, paths_and_table: PathsAndTable):
- """
- 初始化合并器
- Args:
- paths_and_table: 路径和表信息对象
- """
- self.paths_and_table = paths_and_table
- self.updated_files = multiprocessing.Manager().list()
- def _merge_dataframes(self, exists_df: pd.DataFrame, now_df: pd.DataFrame) -> pd.DataFrame:
- """
- 合并两个数据框并去重排序
- Args:
- exists_df: 已存在的数据框
- now_df: 当前的数据框
- Returns:
- 合并后的数据框
- """
- combined_df = pd.concat([exists_df, now_df])
- # 去重,保留最新的数据
- combined_df = combined_df.drop_duplicates(
- subset=self.TIME_STAMP_COLUMN,
- keep='last'
- )
- # 按时间戳排序
- return combined_df.sort_values(
- by=self.TIME_STAMP_COLUMN
- ).reset_index(drop=True)
- def _save_combined_file(self, file_path: str, key: Tuple[str, str], exists_file_path: Optional[str]) -> None:
- """
- 保存合并后的文件
- Args:
- file_path: 新文件路径
- key: 文件键值 (目录名, 文件名)
- exists_file_path: 已存在的文件路径,如果为None则表示不存在
- """
- has_exists = exists_file_path is not None
- if has_exists:
- # 合并并保存
- exists_df = read_file_to_df(exists_file_path)
- now_df = read_file_to_df(file_path)
- combined_df = self._merge_dataframes(exists_df, now_df)
- combined_df.to_csv(exists_file_path, encoding='utf-8', index=False)
- self.updated_files.append(exists_file_path)
- else:
- # 复制新文件
- save_dir = str(os.path.join(
- self.paths_and_table.get_save_path(),
- key[0],
- key[1]
- ))
- copy_to_new(file_path, save_dir)
- self.updated_files.append(save_dir)
- # 记录日志
- status = "包含" if has_exists else "不包含"
- debug(f"{key[0]}/{key[1]} {status} 相同文件,保存成功")
- def _build_file_maps(self, base_path: str) -> Dict[Tuple[str, str], str]:
- """
- 构建文件映射字典
- Args:
- base_path: 基础路径
- Returns:
- 文件路径映射字典,键为(目录名, 文件名),值为完整路径
- """
- files = read_excel_files(base_path)
- return {
- (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path)): file_path
- for file_path in files
- }
- def combine_and_save_formal_file(self) -> None:
- """合并并保存正式文件的主方法"""
- # 构建已存在文件和新文件的映射
- exists_file_maps = self._build_file_maps(self.paths_and_table.get_save_path())
- new_file_maps = self._build_file_maps(self.paths_and_table.get_tmp_formal_path())
- # 找出相同键的文件
- same_keys = set(exists_file_maps.keys()) & set(new_file_maps.keys())
- # 准备并行处理参数
- process_args = [
- (
- file_path,
- key,
- exists_file_maps.get(key) if key in same_keys else None
- )
- for key, file_path in new_file_maps.items()
- ]
- # 使用并行处理
- cpu_count = get_available_cpu_count_with_percent(ParallelProcessing.CPU_USAGE_PERCENT)
- cpu_count = min(cpu_count, ParallelProcessing.MAX_PROCESSES)
- with multiprocessing.Pool(cpu_count) as pool:
- pool.starmap(self._save_combined_file, process_args)
- def run(self) -> List[str]:
- """
- 执行合并操作
- Returns:
- 更新后的文件路径列表
- """
- self.combine_and_save_formal_file()
- info(f"共处理了 {len(self.updated_files)} 个文件")
- return list(self.updated_files)
|