unzip.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/17
  3. # @Author : 魏志亮
  4. import os
  5. import zipfile
  6. import rarfile
  7. from utils.log.trans_log import trans_print, logger
  8. def __support_gbk(zip_file: zipfile.ZipFile):
  9. name_to_info = zip_file.NameToInfo
  10. # copy map first
  11. for name, info in name_to_info.copy().items():
  12. real_name = name.encode('cp437').decode('gbk')
  13. if real_name != name:
  14. info.filename = real_name
  15. del name_to_info[name]
  16. name_to_info[real_name] = info
  17. return zip_file
  18. def unzip(zip_filepath, dest_path):
  19. # 解压zip文件
  20. is_success = True
  21. trans_print('开始读取文件:', zip_filepath)
  22. trans_print("解压到:", dest_path)
  23. try:
  24. with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
  25. zip_ref.extractall(dest_path)
  26. except zipfile.BadZipFile as e:
  27. logger.exception(e)
  28. is_success = False
  29. message = str(e)
  30. trans_print('不是zip文件:', zip_filepath)
  31. return is_success, e
  32. # 遍历解压后的文件
  33. dest_path = dest_path
  34. print('解压再次读取', dest_path)
  35. if is_success:
  36. for root, dirs, files in os.walk(dest_path):
  37. for file in files:
  38. file_path = os.path.join(root, file)
  39. # 检查文件是否是zip文件
  40. if file_path.endswith('.zip'):
  41. if file_path.endswith('.csv.zip'):
  42. os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
  43. else:
  44. # 如果是,递归解压
  45. unzip(file_path, dest_path + os.sep + get_desc_path(str(file)))
  46. # 删除已解压的zip文件(可选)
  47. os.remove(file_path)
  48. # 检查文件是否是zip文件
  49. if file_path.endswith('.rar'):
  50. # 如果是,递归解压
  51. unrar(file_path, dest_path + os.sep + get_desc_path(str(file)))
  52. # 删除已解压的zip文件(可选)
  53. os.remove(file_path)
  54. return is_success, ''
  55. def unrar(rar_file_path, dest_dir):
  56. # 检查目标目录是否存在,如果不存在则创建
  57. # 解压zip文件
  58. is_success = True
  59. trans_print('开始读取文件:', rar_file_path)
  60. dest_path = dest_dir
  61. trans_print("解压到:", dest_path)
  62. if not os.path.exists(dest_path):
  63. os.makedirs(dest_path)
  64. try:
  65. # 打开RAR文件
  66. with rarfile.RarFile(rar_file_path) as rf:
  67. # 循环遍历RAR文件中的所有成员(文件和目录)
  68. for member in rf.infolist():
  69. # 解压文件到目标目录
  70. rf.extract(member, dest_path)
  71. except Exception as e:
  72. logger.exception(e)
  73. is_success = False
  74. message = str(e)
  75. trans_print('不是rar文件:', rar_file_path)
  76. return is_success, e
  77. # 遍历解压后的文件
  78. print('解压再次读取', dest_path)
  79. if is_success:
  80. for root, dirs, files in os.walk(dest_path):
  81. for file in files:
  82. file_path = os.path.join(root, file)
  83. # 检查文件是否是zip文件
  84. if file_path.endswith('.rar'):
  85. # 如果是,递归解压
  86. unrar(file_path, get_desc_path(file_path))
  87. # 删除已解压的zip文件(可选)
  88. os.remove(file_path)
  89. if file_path.endswith('.zip'):
  90. # 如果是,递归解压
  91. unzip(file_path, get_desc_path(file_path))
  92. # 删除已解压的zip文件(可选)
  93. os.remove(file_path)
  94. return is_success, ''
  95. def get_desc_path(path):
  96. return path[0:path.rfind(".")]