unzip.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/17
  3. # @Author : 魏志亮
  4. import os
  5. import traceback
  6. import zipfile
  7. from typing import Tuple, Optional
  8. import rarfile
  9. from utils.file.trans_methods import detect_file_encoding, create_file_path
  10. from utils.log.trans_log import debug, error
  11. def __support_gbk(zip_file: zipfile.ZipFile) -> zipfile.ZipFile:
  12. """
  13. 支持GBK编码的zip文件
  14. Args:
  15. zip_file: ZipFile对象
  16. Returns:
  17. 处理后的ZipFile对象
  18. """
  19. name_to_info = zip_file.NameToInfo
  20. # copy map first
  21. for name, info in name_to_info.copy().items():
  22. real_name = name.encode('cp437').decode('gbk')
  23. if real_name != name:
  24. info.filename = real_name
  25. del name_to_info[name]
  26. name_to_info[real_name] = info
  27. return zip_file
  28. def unzip(zip_filepath: str, dest_path: str) -> Tuple[bool, Optional[Exception]]:
  29. """
  30. 解压zip文件
  31. Args:
  32. zip_filepath: zip文件路径
  33. dest_path: 解压目标路径
  34. Returns:
  35. (是否成功, 错误信息)
  36. """
  37. # 解压zip文件
  38. is_success = True
  39. debug('开始读取文件:', zip_filepath)
  40. debug("解压到:", dest_path)
  41. # 确保目标路径存在
  42. create_file_path(dest_path)
  43. try:
  44. if detect_file_encoding(zip_filepath).startswith("gb"):
  45. try:
  46. with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
  47. zip_ref.extractall(dest_path)
  48. except Exception:
  49. with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
  50. zip_ref.extractall(dest_path)
  51. else:
  52. with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
  53. zip_ref.extractall(dest_path)
  54. except zipfile.BadZipFile as e:
  55. error(traceback.format_exc())
  56. is_success = False
  57. error('不是zip文件:', zip_filepath)
  58. return is_success, e
  59. except Exception as e:
  60. error(traceback.format_exc())
  61. is_success = False
  62. error('解压文件出错:', zip_filepath, str(e))
  63. return is_success, e
  64. # 遍历解压后的文件
  65. debug('解压再次读取', dest_path)
  66. if is_success:
  67. for root, dirs, files in os.walk(dest_path):
  68. for file in files:
  69. file_path = os.path.join(root, file)
  70. # 检查文件是否是zip文件
  71. if file_path.endswith('.zip'):
  72. if file_path.endswith('.csv.zip'):
  73. os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
  74. else:
  75. # 如果是,递归解压
  76. unzip(file_path, os.path.join(dest_path, get_desc_path(str(file))))
  77. # 删除已解压的zip文件
  78. os.remove(file_path)
  79. # 检查文件是否是rar文件
  80. elif file_path.endswith('.rar'):
  81. # 如果是,递归解压
  82. unrar(file_path, os.path.join(dest_path, get_desc_path(str(file))))
  83. # 删除已解压的rar文件
  84. os.remove(file_path)
  85. return is_success, None
  86. def unrar(rar_file_path: str, dest_dir: str) -> Tuple[bool, Optional[Exception]]:
  87. """
  88. 解压rar文件
  89. Args:
  90. rar_file_path: rar文件路径
  91. dest_dir: 解压目标目录
  92. Returns:
  93. (是否成功, 错误信息)
  94. """
  95. # 解压rar文件
  96. is_success = True
  97. debug('开始读取文件:', rar_file_path)
  98. dest_path = dest_dir
  99. debug("解压到:", dest_path)
  100. # 确保目标路径存在
  101. create_file_path(dest_path)
  102. try:
  103. # 打开RAR文件
  104. with rarfile.RarFile(rar_file_path) as rf:
  105. # 循环遍历RAR文件中的所有成员(文件和目录)
  106. for member in rf.infolist():
  107. # 解压文件到目标目录
  108. rf.extract(member, dest_path)
  109. except Exception as e:
  110. error(traceback.format_exc())
  111. is_success = False
  112. error('不是rar文件:', rar_file_path)
  113. return is_success, e
  114. # 遍历解压后的文件
  115. debug('解压再次读取', dest_path)
  116. if is_success:
  117. for root, dirs, files in os.walk(dest_path):
  118. for file in files:
  119. file_path = os.path.join(root, file)
  120. # 检查文件是否是rar文件
  121. if file_path.endswith('.rar'):
  122. # 如果是,递归解压
  123. unrar(file_path, get_desc_path(file_path))
  124. # 删除已解压的rar文件
  125. os.remove(file_path)
  126. elif file_path.endswith('.zip'):
  127. # 如果是,递归解压
  128. unzip(file_path, get_desc_path(file_path))
  129. # 删除已解压的zip文件
  130. os.remove(file_path)
  131. return is_success, None
  132. def get_desc_path(file_path: str) -> str:
  133. """
  134. 获取文件路径的描述路径(去除扩展名)
  135. Args:
  136. file_path: 文件路径
  137. Returns:
  138. 去除扩展名的路径
  139. """
  140. return file_path[0:file_path.rfind(".")]