UnzipAndRemove.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import multiprocessing
  2. import os
  3. import traceback
  4. from service.import_data_service import update_transfer_progress
  5. from trans.common.PathParam import PathParam
  6. from utils.file.trans_methods import copy_to_new, read_files, split_array, read_excel_files
  7. from utils.log.import_data_log import log_print
  8. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  9. from utils.zip.unzip import unzip, unrar, get_desc_path
  10. class UnzipAndRemove(object):
  11. def __init__(self, id, process_count, now_count, path_param: PathParam, save_db=True):
  12. self.id = id
  13. self.process_count = process_count
  14. self.now_count = now_count
  15. self.path_param = path_param
  16. self.save_db = save_db
  17. def get_and_remove(self, file):
  18. to_path = self.path_param.get_unzip_tmp_path()
  19. if str(file).endswith("zip"):
  20. if str(file).endswith("csv.zip"):
  21. copy_to_new(file, file.replace(self.path_param.read_dir, to_path).replace("csv.zip", 'csv.gz'))
  22. else:
  23. desc_path = file.replace(self.path_param.read_dir, to_path)
  24. is_success, e = unzip(file, get_desc_path(desc_path))
  25. if not is_success:
  26. # raise e
  27. pass
  28. elif str(file).endswith("rar"):
  29. desc_path = file.replace(self.path_param.read_dir, to_path)
  30. is_success, e = unrar(file, get_desc_path(desc_path))
  31. if not is_success:
  32. log_print(traceback.format_exc())
  33. pass
  34. else:
  35. copy_to_new(file, file.replace(self.path_param.read_dir, to_path))
  36. def remove_file_to_tmp_path(self):
  37. # 读取文件
  38. try:
  39. if os.path.isfile(self.path_param.read_dir):
  40. all_files = [self.path_param.read_dir]
  41. else:
  42. all_files = read_files(self.path_param.read_dir)
  43. # 最大取系统cpu的 三分之二
  44. split_count = get_available_cpu_count_with_percent(1 / 2)
  45. all_arrays = split_array(all_files, split_count)
  46. pool_count = split_count if split_count < len(all_files) else len(all_files)
  47. for index, arr in enumerate(all_arrays):
  48. with multiprocessing.Pool(pool_count) as pool:
  49. pool.starmap(self.get_and_remove, [(i,) for i in arr])
  50. update_transfer_progress(self.id, round(5 + 15 * (index + 1) / len(all_arrays)), self.process_count,
  51. self.now_count, self.save_db)
  52. all_files = read_excel_files(self.path_param.get_unzip_tmp_path())
  53. log_print('读取文件数量:', len(all_files))
  54. except Exception as e:
  55. log_print(traceback.format_exc())
  56. message = "读取文件列表错误:" + self.path_param.read_dir + ",系统返回错误:" + str(e)
  57. raise ValueError(message)
  58. return all_files
  59. def run(self):
  60. self.remove_file_to_tmp_path()
  61. update_transfer_progress(self.id, round(20), self.process_count, self.now_count, self.save_db)
  62. if __name__ == '__main__':
  63. path_param = PathParam(r'd://data//test_import', 'ceshi001', r'd://data//data.zip')
  64. tt = UnzipAndRemove(1, 2, 1, path_param, save_db=False)
  65. tt.run()