CombineAndSaveFormalFile.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import multiprocessing
  2. import os
  3. from typing import Dict, List, Tuple, Optional
  4. import pandas as pd
  5. from conf.constants import DataProcessing, ParallelProcessing
  6. from etl.common.PathsAndTable import PathsAndTable
  7. from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new
  8. from utils.log.trans_log import info, debug
  9. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  10. class CombineAndSaveFormalFile:
  11. """合并并保存正式文件"""
  12. # 常量定义
  13. TIME_STAMP_COLUMN = DataProcessing.TIME_STAMP_COLUMN
  14. def __init__(self, paths_and_table: PathsAndTable):
  15. """
  16. 初始化合并器
  17. Args:
  18. paths_and_table: 路径和表信息对象
  19. """
  20. self.paths_and_table = paths_and_table
  21. self.updated_files = multiprocessing.Manager().list()
  22. def _merge_dataframes(self, exists_df: pd.DataFrame, now_df: pd.DataFrame) -> pd.DataFrame:
  23. """
  24. 合并两个数据框并去重排序
  25. Args:
  26. exists_df: 已存在的数据框
  27. now_df: 当前的数据框
  28. Returns:
  29. 合并后的数据框
  30. """
  31. combined_df = pd.concat([exists_df, now_df])
  32. # 去重,保留最新的数据
  33. combined_df = combined_df.drop_duplicates(
  34. subset=self.TIME_STAMP_COLUMN,
  35. keep='last'
  36. )
  37. # 按时间戳排序
  38. return combined_df.sort_values(
  39. by=self.TIME_STAMP_COLUMN
  40. ).reset_index(drop=True)
  41. def _save_combined_file(self, file_path: str, key: Tuple[str, str], exists_file_path: Optional[str]) -> None:
  42. """
  43. 保存合并后的文件
  44. Args:
  45. file_path: 新文件路径
  46. key: 文件键值 (目录名, 文件名)
  47. exists_file_path: 已存在的文件路径,如果为None则表示不存在
  48. """
  49. has_exists = exists_file_path is not None
  50. if has_exists:
  51. # 合并并保存
  52. exists_df = read_file_to_df(exists_file_path)
  53. now_df = read_file_to_df(file_path)
  54. combined_df = self._merge_dataframes(exists_df, now_df)
  55. combined_df.to_csv(exists_file_path, encoding='utf-8', index=False)
  56. self.updated_files.append(exists_file_path)
  57. else:
  58. # 复制新文件
  59. save_dir = str(os.path.join(
  60. self.paths_and_table.get_save_path(),
  61. key[0],
  62. key[1]
  63. ))
  64. copy_to_new(file_path, save_dir)
  65. self.updated_files.append(save_dir)
  66. # 记录日志
  67. status = "包含" if has_exists else "不包含"
  68. debug(f"{key[0]}/{key[1]} {status} 相同文件,保存成功")
  69. def _build_file_maps(self, base_path: str) -> Dict[Tuple[str, str], str]:
  70. """
  71. 构建文件映射字典
  72. Args:
  73. base_path: 基础路径
  74. Returns:
  75. 文件路径映射字典,键为(目录名, 文件名),值为完整路径
  76. """
  77. files = read_excel_files(base_path)
  78. return {
  79. (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path)): file_path
  80. for file_path in files
  81. }
  82. def combine_and_save_formal_file(self) -> None:
  83. """合并并保存正式文件的主方法"""
  84. # 构建已存在文件和新文件的映射
  85. exists_file_maps = self._build_file_maps(self.paths_and_table.get_save_path())
  86. new_file_maps = self._build_file_maps(self.paths_and_table.get_tmp_formal_path())
  87. # 找出相同键的文件
  88. same_keys = set(exists_file_maps.keys()) & set(new_file_maps.keys())
  89. # 准备并行处理参数
  90. process_args = [
  91. (
  92. file_path,
  93. key,
  94. exists_file_maps.get(key) if key in same_keys else None
  95. )
  96. for key, file_path in new_file_maps.items()
  97. ]
  98. # 使用并行处理
  99. cpu_count = get_available_cpu_count_with_percent(ParallelProcessing.CPU_USAGE_PERCENT)
  100. cpu_count = min(cpu_count, ParallelProcessing.MAX_PROCESSES)
  101. with multiprocessing.Pool(cpu_count) as pool:
  102. pool.starmap(self._save_combined_file, process_args)
  103. def run(self) -> List[str]:
  104. """
  105. 执行合并操作
  106. Returns:
  107. 更新后的文件路径列表
  108. """
  109. self.combine_and_save_formal_file()
  110. info(f"共处理了 {len(self.updated_files)} 个文件")
  111. return list(self.updated_files)