CombineAndSaveFormalFile.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import multiprocessing
  2. import os
  3. import pandas as pd
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new
  6. from utils.log.trans_log import trans_print
  7. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  8. class CombineAndSaveFormalFile(object):
  9. def __init__(self, pathsAndTable: PathsAndTable):
  10. self.pathsAndTable = pathsAndTable
  11. self.update_files = multiprocessing.Manager().list()
  12. def combine_and_save(self, file_path, key, exists_file_path):
  13. exists_same = False
  14. if exists_file_path:
  15. exists_same = True
  16. exists_df = read_file_to_df(exists_file_path)
  17. now_df = read_file_to_df(file_path)
  18. # 合并两个 DataFrame
  19. combined_df = pd.concat([exists_df, now_df])
  20. # 去重,保留 now_df 的值
  21. combined_df = combined_df.drop_duplicates(subset='time_stamp', keep='last')
  22. # 按 time_stamp 排序
  23. combined_df = combined_df.sort_values(by='time_stamp').reset_index(drop=True)
  24. combined_df.to_csv(exists_file_path, encoding='utf-8', index=False)
  25. self.update_files.append(exists_file_path)
  26. else:
  27. save_path = str(os.path.join(self.pathsAndTable.get_save_path(), key[0], key[1]))
  28. copy_to_new(file_path, save_path)
  29. self.update_files.append(save_path)
  30. trans_print(f"{key[0]}/{key[1]} {'包含' if exists_same else '不包含'} 相同文件,保存成功")
  31. def combine_and_save_formal_file(self):
  32. exists_files = read_excel_files(self.pathsAndTable.get_save_path())
  33. exists_file_maps = dict()
  34. for file_path in exists_files:
  35. name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path))
  36. exists_file_maps[name] = file_path
  37. new_files = read_excel_files(self.pathsAndTable.get_tmp_formal_path())
  38. new_file_maps = dict()
  39. for file_path in new_files:
  40. name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path))
  41. new_file_maps[name] = file_path
  42. same_keys = list(set(exists_file_maps.keys()).intersection(new_file_maps.keys()))
  43. split_count = get_available_cpu_count_with_percent(2 / 3)
  44. with multiprocessing.Pool(split_count) as pool:
  45. pool.starmap(self.combine_and_save,
  46. [(file_path, key, exists_file_maps[key] if key in same_keys else None) for key, file_path in
  47. new_file_maps.items()])
  48. def run(self):
  49. self.combine_and_save_formal_file()
  50. print(self.update_files)
  51. return list(self.update_files)