import multiprocessing import os import traceback from typing import List, Optional from conf.constants import ParallelProcessing from etl.common.PathsAndTable import PathsAndTable from service.trans_conf_service import update_trans_transfer_progress from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array from utils.log.trans_log import info, error from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent from utils.zip.unzip import unzip, unrar, get_desc_path class UnzipAndRemove(object): """解压缩并移动文件类""" def __init__(self, pathsAndTable: PathsAndTable, filter_types: Optional[List[str]] = None): """ 初始化解压缩并移动文件类 Args: pathsAndTable: 路径和表对象 filter_types: 文件类型过滤器 """ self.pathsAndTable = pathsAndTable self.filter_types = filter_types def get_and_remove(self, file: str) -> None: """ 解压缩或移动文件到临时路径 Args: file: 文件路径 """ to_path = self.pathsAndTable.get_excel_tmp_path() file_lower = str(file).lower() if file_lower.endswith("zip"): if file_lower.endswith("csv.zip"): copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path).replace("csv.zip", 'csv.gz')) else: desc_path = file.replace(self.pathsAndTable.read_dir, to_path) unzip(file, get_desc_path(desc_path)) self.pathsAndTable.has_zip = True elif file_lower.endswith("rar"): desc_path = file.replace(self.pathsAndTable.read_dir, to_path) is_success, e = unrar(file, get_desc_path(desc_path)) self.pathsAndTable.has_zip = True else: copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path)) def remove_file_to_tmp_path(self) -> List[str]: """ 将文件移动到临时路径 Returns: 处理后的文件列表 """ # 读取文件 try: if os.path.isfile(self.pathsAndTable.read_dir): all_files = [self.pathsAndTable.read_dir] else: all_files = read_files(self.pathsAndTable.read_dir) # 最大取系统cpu的 三分之二 split_count = get_available_cpu_count_with_percent(2 / 3) # 限制最大进程数 split_count = min(split_count, ParallelProcessing.MAX_PROCESSES) all_arrays = split_array(all_files, split_count) for index, arr in enumerate(all_arrays): pool_count = min(split_count, len(arr)) with multiprocessing.Pool(pool_count) as pool: pool.starmap(self.get_and_remove, [(i,) for i in arr]) update_trans_transfer_progress(self.pathsAndTable.id, round(5 + 15 * (index + 1) / len(all_arrays), 2), self.pathsAndTable.save_db) all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path()) info('读取文件数量:', len(all_files)) except Exception as e: error(traceback.format_exc()) message = "读取文件列表错误:" + self.pathsAndTable.read_dir + ",系统返回错误:" + str(e) raise ValueError(message) return all_files def run(self) -> None: """ 运行解压缩和移动文件流程 """ self.remove_file_to_tmp_path() update_trans_transfer_progress(self.pathsAndTable.id, 20, self.pathsAndTable.save_db)