# -*- coding: utf-8 -*- # @Time : 2024/5/17 # @Author : 魏志亮 import os import traceback import zipfile from typing import Tuple, Optional import rarfile from utils.file.trans_methods import detect_file_encoding, create_file_path from utils.log.trans_log import debug, error def __support_gbk(zip_file: zipfile.ZipFile) -> zipfile.ZipFile: """ 支持GBK编码的zip文件 Args: zip_file: ZipFile对象 Returns: 处理后的ZipFile对象 """ name_to_info = zip_file.NameToInfo # copy map first for name, info in name_to_info.copy().items(): real_name = name.encode('cp437').decode('gbk') if real_name != name: info.filename = real_name del name_to_info[name] name_to_info[real_name] = info return zip_file def unzip(zip_filepath: str, dest_path: str) -> Tuple[bool, Optional[Exception]]: """ 解压zip文件 Args: zip_filepath: zip文件路径 dest_path: 解压目标路径 Returns: (是否成功, 错误信息) """ # 解压zip文件 is_success = True debug('开始读取文件:', zip_filepath) debug("解压到:", dest_path) # 确保目标路径存在 create_file_path(dest_path) try: if detect_file_encoding(zip_filepath).startswith("gb"): try: with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref: zip_ref.extractall(dest_path) except Exception: with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: zip_ref.extractall(dest_path) else: with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: zip_ref.extractall(dest_path) except zipfile.BadZipFile as e: error(traceback.format_exc()) is_success = False error('不是zip文件:', zip_filepath) return is_success, e except Exception as e: error(traceback.format_exc()) is_success = False error('解压文件出错:', zip_filepath, str(e)) return is_success, e # 遍历解压后的文件 debug('解压再次读取', dest_path) if is_success: for root, dirs, files in os.walk(dest_path): for file in files: file_path = os.path.join(root, file) # 检查文件是否是zip文件 if file_path.endswith('.zip'): if file_path.endswith('.csv.zip'): os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz")) else: # 如果是,递归解压 unzip(file_path, os.path.join(dest_path, get_desc_path(str(file)))) # 删除已解压的zip文件 os.remove(file_path) # 检查文件是否是rar文件 elif file_path.endswith('.rar'): # 如果是,递归解压 unrar(file_path, os.path.join(dest_path, get_desc_path(str(file)))) # 删除已解压的rar文件 os.remove(file_path) return is_success, None def unrar(rar_file_path: str, dest_dir: str) -> Tuple[bool, Optional[Exception]]: """ 解压rar文件 Args: rar_file_path: rar文件路径 dest_dir: 解压目标目录 Returns: (是否成功, 错误信息) """ # 解压rar文件 is_success = True debug('开始读取文件:', rar_file_path) dest_path = dest_dir debug("解压到:", dest_path) # 确保目标路径存在 create_file_path(dest_path) try: # 打开RAR文件 with rarfile.RarFile(rar_file_path) as rf: # 循环遍历RAR文件中的所有成员(文件和目录) for member in rf.infolist(): # 解压文件到目标目录 rf.extract(member, dest_path) except Exception as e: error(traceback.format_exc()) is_success = False error('不是rar文件:', rar_file_path) return is_success, e # 遍历解压后的文件 debug('解压再次读取', dest_path) if is_success: for root, dirs, files in os.walk(dest_path): for file in files: file_path = os.path.join(root, file) # 检查文件是否是rar文件 if file_path.endswith('.rar'): # 如果是,递归解压 unrar(file_path, get_desc_path(file_path)) # 删除已解压的rar文件 os.remove(file_path) elif file_path.endswith('.zip'): # 如果是,递归解压 unzip(file_path, get_desc_path(file_path)) # 删除已解压的zip文件 os.remove(file_path) return is_success, None def get_desc_path(file_path: str) -> str: """ 获取文件路径的描述路径(去除扩展名) Args: file_path: 文件路径 Returns: 去除扩展名的路径 """ return file_path[0:file_path.rfind(".")]