UnzipAndRemove.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import multiprocessing
  2. import os
  3. import traceback
  4. from typing import List, Optional
  5. from conf.constants import ParallelProcessing
  6. from etl.common.PathsAndTable import PathsAndTable
  7. from service.trans_conf_service import update_trans_transfer_progress
  8. from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
  9. from utils.log.trans_log import info, error
  10. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  11. from utils.zip.unzip import unzip, unrar, get_desc_path
  12. class UnzipAndRemove(object):
  13. """解压缩并移动文件类"""
  14. def __init__(self, pathsAndTable: PathsAndTable, filter_types: Optional[List[str]] = None):
  15. """
  16. 初始化解压缩并移动文件类
  17. Args:
  18. pathsAndTable: 路径和表对象
  19. filter_types: 文件类型过滤器
  20. """
  21. self.pathsAndTable = pathsAndTable
  22. self.filter_types = filter_types
  23. def get_and_remove(self, file: str) -> None:
  24. """
  25. 解压缩或移动文件到临时路径
  26. Args:
  27. file: 文件路径
  28. """
  29. to_path = self.pathsAndTable.get_excel_tmp_path()
  30. file_lower = str(file).lower()
  31. if file_lower.endswith("zip"):
  32. if file_lower.endswith("csv.zip"):
  33. copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path).replace("csv.zip", 'csv.gz'))
  34. else:
  35. desc_path = file.replace(self.pathsAndTable.read_dir, to_path)
  36. unzip(file, get_desc_path(desc_path))
  37. self.pathsAndTable.has_zip = True
  38. elif file_lower.endswith("rar"):
  39. desc_path = file.replace(self.pathsAndTable.read_dir, to_path)
  40. is_success, e = unrar(file, get_desc_path(desc_path))
  41. self.pathsAndTable.has_zip = True
  42. else:
  43. copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path))
  44. def remove_file_to_tmp_path(self) -> List[str]:
  45. """
  46. 将文件移动到临时路径
  47. Returns:
  48. 处理后的文件列表
  49. """
  50. # 读取文件
  51. try:
  52. if os.path.isfile(self.pathsAndTable.read_dir):
  53. all_files = [self.pathsAndTable.read_dir]
  54. else:
  55. all_files = read_files(self.pathsAndTable.read_dir)
  56. # 最大取系统cpu的 三分之二
  57. split_count = get_available_cpu_count_with_percent(2 / 3)
  58. # 限制最大进程数
  59. split_count = min(split_count, ParallelProcessing.MAX_PROCESSES)
  60. all_arrays = split_array(all_files, split_count)
  61. for index, arr in enumerate(all_arrays):
  62. pool_count = min(split_count, len(arr))
  63. with multiprocessing.Pool(pool_count) as pool:
  64. pool.starmap(self.get_and_remove, [(i,) for i in arr])
  65. update_trans_transfer_progress(self.pathsAndTable.id,
  66. round(5 + 15 * (index + 1) / len(all_arrays), 2),
  67. self.pathsAndTable.save_db)
  68. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  69. info('读取文件数量:', len(all_files))
  70. except Exception as e:
  71. error(traceback.format_exc())
  72. message = "读取文件列表错误:" + self.pathsAndTable.read_dir + ",系统返回错误:" + str(e)
  73. raise ValueError(message)
  74. return all_files
  75. def run(self) -> None:
  76. """
  77. 运行解压缩和移动文件流程
  78. """
  79. self.remove_file_to_tmp_path()
  80. update_trans_transfer_progress(self.pathsAndTable.id, 20,
  81. self.pathsAndTable.save_db)