import multiprocessing import os import pandas as pd from etl.common.PathsAndTable import PathsAndTable from utils.file.trans_methods import read_excel_files, read_file_to_df, copy_to_new from utils.log.trans_log import trans_print from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class CombineAndSaveFormalFile(object): def __init__(self, pathsAndTable: PathsAndTable): self.pathsAndTable = pathsAndTable self.update_files = multiprocessing.Manager().list() def combine_and_save(self, file_path, key, exists_file_path): exists_same = False if exists_file_path: exists_same = True exists_df = read_file_to_df(exists_file_path) now_df = read_file_to_df(file_path) # 合并两个 DataFrame combined_df = pd.concat([exists_df, now_df]) # 去重,保留 now_df 的值 combined_df = combined_df.drop_duplicates(subset='time_stamp', keep='last') # 按 time_stamp 排序 combined_df = combined_df.sort_values(by='time_stamp').reset_index(drop=True) combined_df.to_csv(exists_file_path, encoding='utf-8', index=False) self.update_files.append(exists_file_path) else: save_path = str(os.path.join(self.pathsAndTable.get_save_path(), key[0], key[1])) copy_to_new(file_path, save_path) self.update_files.append(save_path) trans_print(f"{key[0]}/{key[1]} {'包含' if exists_same else '不包含'} 相同文件,保存成功") def combine_and_save_formal_file(self): exists_files = read_excel_files(self.pathsAndTable.get_save_path()) exists_file_maps = dict() for file_path in exists_files: name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path)) exists_file_maps[name] = file_path new_files = read_excel_files(self.pathsAndTable.get_tmp_formal_path()) new_file_maps = dict() for file_path in new_files: name = (os.path.basename(os.path.dirname(file_path)), os.path.basename(file_path)) new_file_maps[name] = file_path same_keys = list(set(exists_file_maps.keys()).intersection(new_file_maps.keys())) split_count = get_available_cpu_count_with_percent(2 / 3) with multiprocessing.Pool(split_count) as pool: pool.starmap(self.combine_and_save, [(file_path, key, exists_file_maps[key] if key in same_keys else None) for key, file_path in new_file_maps.items()]) def run(self): self.combine_and_save_formal_file() print(self.update_files) return list(self.update_files)