import multiprocessing import os import traceback from service.import_data_service import update_transfer_progress from trans.common.PathParam import PathParam from utils.file.trans_methods import copy_to_new, read_files, split_array, read_excel_files from utils.log.import_data_log import log_print from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent from utils.zip.unzip import unzip, unrar, get_desc_path class UnzipAndRemove(object): def __init__(self, id, process_count, now_count, path_param: PathParam, save_db=True): self.id = id self.process_count = process_count self.now_count = now_count self.path_param = path_param self.save_db = save_db def get_and_remove(self, file): to_path = self.path_param.get_unzip_tmp_path() if str(file).endswith("zip"): if str(file).endswith("csv.zip"): copy_to_new(file, file.replace(self.path_param.read_dir, to_path).replace("csv.zip", 'csv.gz')) else: desc_path = file.replace(self.path_param.read_dir, to_path) is_success, e = unzip(file, get_desc_path(desc_path)) if not is_success: # raise e pass elif str(file).endswith("rar"): desc_path = file.replace(self.path_param.read_dir, to_path) is_success, e = unrar(file, get_desc_path(desc_path)) if not is_success: log_print(traceback.format_exc()) pass else: copy_to_new(file, file.replace(self.path_param.read_dir, to_path)) def remove_file_to_tmp_path(self): # 读取文件 try: if os.path.isfile(self.path_param.read_dir): all_files = [self.path_param.read_dir] else: all_files = read_files(self.path_param.read_dir) # 最大取系统cpu的 三分之二 split_count = get_available_cpu_count_with_percent(1 / 2) all_arrays = split_array(all_files, split_count) pool_count = split_count if split_count < len(all_files) else len(all_files) for index, arr in enumerate(all_arrays): with multiprocessing.Pool(pool_count) as pool: pool.starmap(self.get_and_remove, [(i,) for i in arr]) update_transfer_progress(self.id, round(5 + 15 * (index + 1) / len(all_arrays)), self.process_count, self.now_count, self.save_db) all_files = read_excel_files(self.path_param.get_unzip_tmp_path()) log_print('读取文件数量:', len(all_files)) except Exception as e: log_print(traceback.format_exc()) message = "读取文件列表错误:" + self.path_param.read_dir + ",系统返回错误:" + str(e) raise ValueError(message) return all_files def run(self): self.remove_file_to_tmp_path() update_transfer_progress(self.id, round(20), self.process_count, self.now_count, self.save_db) if __name__ == '__main__': path_param = PathParam(r'd://data//test_import', 'ceshi001', r'd://data//data.zip') tt = UnzipAndRemove(1, 2, 1, path_param, save_db=False) tt.run()