# -*- coding: utf-8 -*- # @Time : 2024/5/17 # @Author : 魏志亮 import traceback import zipfile from os import * import rarfile from utils.file.trans_methods import detect_file_encoding from utils.log.trans_log import trans_print, logger def __support_gbk(zip_file: zipfile.ZipFile): name_to_info = zip_file.NameToInfo # copy map first for name, info in name_to_info.copy().items(): real_name = name.encode('cp437').decode('gbk') if real_name != name: info.filename = real_name del name_to_info[name] name_to_info[real_name] = info return zip_file def unzip(zip_filepath, dest_path): # 解压zip文件 is_success = True trans_print('开始读取文件:', zip_filepath) trans_print("解压到:", dest_path) try: if detect_file_encoding(zip_filepath).startswith("gb"): try: with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref: zip_ref.extractall(dest_path) except: with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: zip_ref.extractall(dest_path) else: with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: zip_ref.extractall(dest_path) except zipfile.BadZipFile as e: trans_print(traceback.format_exc()) is_success = False trans_print('不是zip文件:', zip_filepath) return is_success, e # 遍历解压后的文件 dest_path = dest_path trans_print('解压再次读取', dest_path) if is_success: for root, dirs, files in walk(dest_path): for file in files: file_path = path.join(root, file) # 检查文件是否是zip文件 if file_path.endswith('.zip'): if file_path.endswith('.csv.zip'): rename(file_path, file_path.replace(".csv.zip", ".csv.gz")) else: # 如果是,递归解压 unzip(file_path, dest_path + sep + get_desc_path(str(file))) # 删除已解压的zip文件(可选) remove(file_path) # 检查文件是否是zip文件 if file_path.endswith('.rar'): # 如果是,递归解压 unrar(file_path, dest_path + sep + get_desc_path(str(file))) # 删除已解压的zip文件(可选) remove(file_path) return is_success, '' def unrar(rar_file_path, dest_dir): # 检查目标目录是否存在,如果不存在则创建 # 解压zip文件 is_success = True trans_print('开始读取文件:', rar_file_path) dest_path = dest_dir trans_print("解压到:", dest_path) if not path.exists(dest_path): makedirs(dest_path) try: # 打开RAR文件 with rarfile.RarFile(rar_file_path) as rf: # 循环遍历RAR文件中的所有成员(文件和目录) for member in rf.infolist(): # 解压文件到目标目录 rf.extract(member, dest_path) except Exception as e: trans_print(traceback.format_exc()) logger.exception(e) is_success = False trans_print('不是rar文件:', rar_file_path) return is_success, e # 遍历解压后的文件 print('解压再次读取', dest_path) if is_success: for root, dirs, files in walk(dest_path): for file in files: file_path = path.join(root, file) # 检查文件是否是zip文件 if file_path.endswith('.rar'): # 如果是,递归解压 unrar(file_path, get_desc_path(file_path)) # 删除已解压的zip文件(可选) remove(file_path) if file_path.endswith('.zip'): # 如果是,递归解压 unzip(file_path, get_desc_path(file_path)) # 删除已解压的zip文件(可选) remove(file_path) return is_success, '' def get_desc_path(path): return path[0:path.rfind(".")]