UnzipAndRemove.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import multiprocessing
  2. import os
  3. import traceback
  4. import datetime
  5. from etl.base.PathsAndTable import PathsAndTable
  6. from service.plt_service import update_trans_transfer_progress
  7. from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
  8. from utils.log.trans_log import trans_print
  9. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  10. from utils.zip.unzip import unzip, unrar, get_desc_path
  11. class UnzipAndRemove(object):
  12. def __init__(self, pathsAndTable: PathsAndTable):
  13. self.pathsAndTable = pathsAndTable
  14. def get_and_remove(self, file):
  15. to_path = self.pathsAndTable.get_excel_tmp_path()
  16. if str(file).endswith("zip"):
  17. if str(file).endswith("csv.zip"):
  18. copy_to_new(file, file.replace(self.pathsAndTable.read_path, to_path).replace("csv.zip", 'csv.gz'))
  19. else:
  20. desc_path = file.replace(self.pathsAndTable.read_path, to_path)
  21. is_success, e = unzip(file, get_desc_path(desc_path))
  22. self.pathsAndTable.has_zip = True
  23. if not is_success:
  24. # raise e
  25. pass
  26. elif str(file).endswith("rar"):
  27. desc_path = file.replace(self.pathsAndTable.read_path, to_path)
  28. is_success, e = unrar(file, get_desc_path(desc_path))
  29. self.pathsAndTable.has_zip = True
  30. if not is_success:
  31. trans_print(traceback.format_exc())
  32. pass
  33. else:
  34. copy_to_new(file, file.replace(self.pathsAndTable.read_path, to_path))
  35. def remove_file_to_tmp_path(self):
  36. # 读取文件
  37. try:
  38. if os.path.isfile(self.pathsAndTable.read_path):
  39. all_files = [self.pathsAndTable.read_path]
  40. else:
  41. all_files = read_files(self.pathsAndTable.read_path)
  42. # 最大取系统cpu的 三分之二
  43. split_count = get_available_cpu_count_with_percent(1/2)
  44. all_arrays = split_array(all_files, split_count)
  45. for index, arr in enumerate(all_arrays):
  46. with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
  47. pool.starmap(self.get_and_remove, [(i,) for i in arr])
  48. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  49. round(5 + 15 * (index + 1) / len(all_arrays), 2),
  50. self.pathsAndTable.save_db)
  51. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  52. trans_print('读取文件数量:', len(all_files))
  53. except Exception as e:
  54. trans_print(traceback.format_exc())
  55. message = "读取文件列表错误:" + self.pathsAndTable.read_path + ",系统返回错误:" + str(e)
  56. raise ValueError(message)
  57. return all_files
  58. def run(self):
  59. trans_print("开始解压移动文件")
  60. begin = datetime.datetime.now()
  61. self.remove_file_to_tmp_path()
  62. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 20,
  63. self.pathsAndTable.save_db)
  64. trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin)