12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import multiprocessing
- import os
- import pandas as pd
- from etl.common.PathsAndTable import PathsAndTable
- from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new
- from utils.log.trans_log import trans_print
- from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
- class CombineAndSaveFormalFile(object):
- def __init__(self, pathsAndTable: PathsAndTable):
- self.pathsAndTable = pathsAndTable
- self.update_files = multiprocessing.Manager().list()
- def combine_and_save(self, file_path, key, exists_file_path):
- exists_same = False
- if exists_file_path:
- exists_same = True
- exists_df = read_file_to_df(exists_file_path)
- now_df = read_file_to_df(file_path)
- # 合并两个 DataFrame
- combined_df = pd.concat([exists_df, now_df])
- # 去重,保留 now_df 的值
- combined_df = combined_df.drop_duplicates(subset='time_stamp', keep='last')
- # 按 time_stamp 排序
- combined_df = combined_df.sort_values(by='time_stamp').reset_index(drop=True)
- combined_df.to_csv(exists_file_path, encoding='utf-8', index=False)
- self.update_files.append(exists_file_path)
- else:
- save_path = str(os.path.join(self.pathsAndTable.get_save_path(), key[0], key[1]))
- copy_to_new(file_path, save_path)
- self.update_files.append(save_path)
- trans_print(f"{key[0]}/{key[1]} {'包含' if exists_same else '不包含'} 相同文件,保存成功")
- def combine_and_save_formal_file(self):
- exists_files = read_excel_files(self.pathsAndTable.get_save_path())
- exists_file_maps = dict()
- for file_path in exists_files:
- name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path))
- exists_file_maps[name] = file_path
- new_files = read_excel_files(self.pathsAndTable.get_tmp_formal_path())
- new_file_maps = dict()
- for file_path in new_files:
- name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path))
- new_file_maps[name] = file_path
- same_keys = list(set(exists_file_maps.keys()).intersection(new_file_maps.keys()))
- split_count = get_available_cpu_count_with_percent(2 / 3)
- with multiprocessing.Pool(split_count) as pool:
- pool.starmap(self.combine_and_save,
- [(file_path, key, exists_file_maps[key] if key in same_keys else None) for key, file_path in
- new_file_maps.items()])
- def run(self):
- self.combine_and_save_formal_file()
- print(self.update_files)
- return list(self.update_files)
|