| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/17
- # @Author : 魏志亮
- import os
- import traceback
- import zipfile
- from typing import Tuple, Optional
- import rarfile
- from utils.file.trans_methods import detect_file_encoding, create_file_path
- from utils.log.trans_log import debug, error
- def __support_gbk(zip_file: zipfile.ZipFile) -> zipfile.ZipFile:
- """
- 支持GBK编码的zip文件
-
- Args:
- zip_file: ZipFile对象
-
- Returns:
- 处理后的ZipFile对象
- """
- name_to_info = zip_file.NameToInfo
- # copy map first
- for name, info in name_to_info.copy().items():
- real_name = name.encode('cp437').decode('gbk')
- if real_name != name:
- info.filename = real_name
- del name_to_info[name]
- name_to_info[real_name] = info
- return zip_file
- def unzip(zip_filepath: str, dest_path: str) -> Tuple[bool, Optional[Exception]]:
- """
- 解压zip文件
-
- Args:
- zip_filepath: zip文件路径
- dest_path: 解压目标路径
-
- Returns:
- (是否成功, 错误信息)
- """
- # 解压zip文件
- is_success = True
- debug('开始读取文件:', zip_filepath)
- debug("解压到:", dest_path)
- # 确保目标路径存在
- create_file_path(dest_path)
- try:
- if detect_file_encoding(zip_filepath).startswith("gb"):
- try:
- with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
- zip_ref.extractall(dest_path)
- except Exception:
- with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
- zip_ref.extractall(dest_path)
- else:
- with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
- zip_ref.extractall(dest_path)
- except zipfile.BadZipFile as e:
- error(traceback.format_exc())
- is_success = False
- error('不是zip文件:', zip_filepath)
- return is_success, e
- except Exception as e:
- error(traceback.format_exc())
- is_success = False
- error('解压文件出错:', zip_filepath, str(e))
- return is_success, e
- # 遍历解压后的文件
- debug('解压再次读取', dest_path)
- if is_success:
- for root, dirs, files in os.walk(dest_path):
- for file in files:
- file_path = os.path.join(root, file)
- # 检查文件是否是zip文件
- if file_path.endswith('.zip'):
- if file_path.endswith('.csv.zip'):
- os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
- else:
- # 如果是,递归解压
- unzip(file_path, os.path.join(dest_path, get_desc_path(str(file))))
- # 删除已解压的zip文件
- os.remove(file_path)
- # 检查文件是否是rar文件
- elif file_path.endswith('.rar'):
- # 如果是,递归解压
- unrar(file_path, os.path.join(dest_path, get_desc_path(str(file))))
- # 删除已解压的rar文件
- os.remove(file_path)
- return is_success, None
- def unrar(rar_file_path: str, dest_dir: str) -> Tuple[bool, Optional[Exception]]:
- """
- 解压rar文件
-
- Args:
- rar_file_path: rar文件路径
- dest_dir: 解压目标目录
-
- Returns:
- (是否成功, 错误信息)
- """
- # 解压rar文件
- is_success = True
- debug('开始读取文件:', rar_file_path)
- dest_path = dest_dir
- debug("解压到:", dest_path)
- # 确保目标路径存在
- create_file_path(dest_path)
- try:
- # 打开RAR文件
- with rarfile.RarFile(rar_file_path) as rf:
- # 循环遍历RAR文件中的所有成员(文件和目录)
- for member in rf.infolist():
- # 解压文件到目标目录
- rf.extract(member, dest_path)
- except Exception as e:
- error(traceback.format_exc())
- is_success = False
- error('不是rar文件:', rar_file_path)
- return is_success, e
- # 遍历解压后的文件
- debug('解压再次读取', dest_path)
- if is_success:
- for root, dirs, files in os.walk(dest_path):
- for file in files:
- file_path = os.path.join(root, file)
- # 检查文件是否是rar文件
- if file_path.endswith('.rar'):
- # 如果是,递归解压
- unrar(file_path, get_desc_path(file_path))
- # 删除已解压的rar文件
- os.remove(file_path)
- elif file_path.endswith('.zip'):
- # 如果是,递归解压
- unzip(file_path, get_desc_path(file_path))
- # 删除已解压的zip文件
- os.remove(file_path)
- return is_success, None
- def get_desc_path(file_path: str) -> str:
- """
- 获取文件路径的描述路径(去除扩展名)
-
- Args:
- file_path: 文件路径
-
- Returns:
- 去除扩展名的路径
- """
- return file_path[0:file_path.rfind(".")]
|