unzip.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/17
  3. # @Author : 魏志亮
  4. import traceback
  5. import zipfile
  6. from os import *
  7. import rarfile
  8. from utils.file.trans_methods import detect_file_encoding
  9. from utils.log.trans_log import trans_print, logger
  10. def __support_gbk(zip_file: zipfile.ZipFile):
  11. name_to_info = zip_file.NameToInfo
  12. # copy map first
  13. for name, info in name_to_info.copy().items():
  14. real_name = name.encode('cp437').decode('gbk')
  15. if real_name != name:
  16. info.filename = real_name
  17. del name_to_info[name]
  18. name_to_info[real_name] = info
  19. return zip_file
  20. def unzip(zip_filepath, dest_path):
  21. # 解压zip文件
  22. is_success = True
  23. trans_print('开始读取文件:', zip_filepath)
  24. trans_print("解压到:", dest_path)
  25. try:
  26. if detect_file_encoding(zip_filepath).startswith("gb"):
  27. try:
  28. with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
  29. zip_ref.extractall(dest_path)
  30. except:
  31. with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
  32. zip_ref.extractall(dest_path)
  33. else:
  34. with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
  35. zip_ref.extractall(dest_path)
  36. except zipfile.BadZipFile as e:
  37. trans_print(traceback.format_exc())
  38. is_success = False
  39. trans_print('不是zip文件:', zip_filepath)
  40. return is_success, e
  41. # 遍历解压后的文件
  42. dest_path = dest_path
  43. trans_print('解压再次读取', dest_path)
  44. if is_success:
  45. for root, dirs, files in walk(dest_path):
  46. for file in files:
  47. file_path = path.join(root, file)
  48. # 检查文件是否是zip文件
  49. if file_path.endswith('.zip'):
  50. if file_path.endswith('.csv.zip'):
  51. rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
  52. else:
  53. # 如果是,递归解压
  54. unzip(file_path, dest_path + sep + get_desc_path(str(file)))
  55. # 删除已解压的zip文件(可选)
  56. remove(file_path)
  57. # 检查文件是否是zip文件
  58. if file_path.endswith('.rar'):
  59. # 如果是,递归解压
  60. unrar(file_path, dest_path + sep + get_desc_path(str(file)))
  61. # 删除已解压的zip文件(可选)
  62. remove(file_path)
  63. return is_success, ''
  64. def unrar(rar_file_path, dest_dir):
  65. # 检查目标目录是否存在,如果不存在则创建
  66. # 解压zip文件
  67. is_success = True
  68. trans_print('开始读取文件:', rar_file_path)
  69. dest_path = dest_dir
  70. trans_print("解压到:", dest_path)
  71. if not path.exists(dest_path):
  72. makedirs(dest_path)
  73. try:
  74. # 打开RAR文件
  75. with rarfile.RarFile(rar_file_path) as rf:
  76. # 循环遍历RAR文件中的所有成员(文件和目录)
  77. for member in rf.infolist():
  78. # 解压文件到目标目录
  79. rf.extract(member, dest_path)
  80. except Exception as e:
  81. trans_print(traceback.format_exc())
  82. logger.exception(e)
  83. is_success = False
  84. trans_print('不是rar文件:', rar_file_path)
  85. return is_success, e
  86. # 遍历解压后的文件
  87. print('解压再次读取', dest_path)
  88. if is_success:
  89. for root, dirs, files in walk(dest_path):
  90. for file in files:
  91. file_path = path.join(root, file)
  92. # 检查文件是否是zip文件
  93. if file_path.endswith('.rar'):
  94. # 如果是,递归解压
  95. unrar(file_path, get_desc_path(file_path))
  96. # 删除已解压的zip文件(可选)
  97. remove(file_path)
  98. if file_path.endswith('.zip'):
  99. # 如果是,递归解压
  100. unzip(file_path, get_desc_path(file_path))
  101. # 删除已解压的zip文件(可选)
  102. remove(file_path)
  103. return is_success, ''
  104. def get_desc_path(path):
  105. return path[0:path.rfind(".")]