123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/17
- # @Author : 魏志亮
- import traceback
- import zipfile
- from os import *
- import rarfile
- from utils.file.trans_methods import detect_file_encoding
- from utils.log.trans_log import trans_print, logger
- def __support_gbk(zip_file: zipfile.ZipFile):
- name_to_info = zip_file.NameToInfo
- # copy map first
- for name, info in name_to_info.copy().items():
- real_name = name.encode('cp437').decode('gbk')
- if real_name != name:
- info.filename = real_name
- del name_to_info[name]
- name_to_info[real_name] = info
- return zip_file
- def unzip(zip_filepath, dest_path):
- # 解压zip文件
- is_success = True
- trans_print('开始读取文件:', zip_filepath)
- trans_print("解压到:", dest_path)
- try:
- if detect_file_encoding(zip_filepath).startswith("gb"):
- try:
- with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
- zip_ref.extractall(dest_path)
- except:
- with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
- zip_ref.extractall(dest_path)
- else:
- with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
- zip_ref.extractall(dest_path)
- except zipfile.BadZipFile as e:
- trans_print(traceback.format_exc())
- is_success = False
- trans_print('不是zip文件:', zip_filepath)
- return is_success, e
- # 遍历解压后的文件
- dest_path = dest_path
- trans_print('解压再次读取', dest_path)
- if is_success:
- for root, dirs, files in walk(dest_path):
- for file in files:
- file_path = path.join(root, file)
- # 检查文件是否是zip文件
- if file_path.endswith('.zip'):
- if file_path.endswith('.csv.zip'):
- rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
- else:
- # 如果是,递归解压
- unzip(file_path, dest_path + sep + get_desc_path(str(file)))
- # 删除已解压的zip文件(可选)
- remove(file_path)
- # 检查文件是否是zip文件
- if file_path.endswith('.rar'):
- # 如果是,递归解压
- unrar(file_path, dest_path + sep + get_desc_path(str(file)))
- # 删除已解压的zip文件(可选)
- remove(file_path)
- return is_success, ''
- def unrar(rar_file_path, dest_dir):
- # 检查目标目录是否存在,如果不存在则创建
- # 解压zip文件
- is_success = True
- trans_print('开始读取文件:', rar_file_path)
- dest_path = dest_dir
- trans_print("解压到:", dest_path)
- if not path.exists(dest_path):
- makedirs(dest_path)
- try:
- # 打开RAR文件
- with rarfile.RarFile(rar_file_path) as rf:
- # 循环遍历RAR文件中的所有成员(文件和目录)
- for member in rf.infolist():
- # 解压文件到目标目录
- rf.extract(member, dest_path)
- except Exception as e:
- trans_print(traceback.format_exc())
- logger.exception(e)
- is_success = False
- trans_print('不是rar文件:', rar_file_path)
- return is_success, e
- # 遍历解压后的文件
- print('解压再次读取', dest_path)
- if is_success:
- for root, dirs, files in walk(dest_path):
- for file in files:
- file_path = path.join(root, file)
- # 检查文件是否是zip文件
- if file_path.endswith('.rar'):
- # 如果是,递归解压
- unrar(file_path, get_desc_path(file_path))
- # 删除已解压的zip文件(可选)
- remove(file_path)
- if file_path.endswith('.zip'):
- # 如果是,递归解压
- unzip(file_path, get_desc_path(file_path))
- # 删除已解压的zip文件(可选)
- remove(file_path)
- return is_success, ''
- def get_desc_path(path):
- return path[0:path.rfind(".")]
|