12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- import multiprocessing
- import traceback
- from os import *
- from etl.common.PathsAndTable import PathsAndTable
- from service.trans_conf_service import update_trans_transfer_progress
- from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
- from utils.log.trans_log import trans_print
- from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
- from utils.zip.unzip import unzip, unrar, get_desc_path
- class UnzipAndRemove(object):
- def __init__(self, pathsAndTable: PathsAndTable, filter_types=None):
- self.pathsAndTable = pathsAndTable
- self.filter_types = filter_types
- def get_and_remove(self, file):
- to_path = self.pathsAndTable.get_excel_tmp_path()
- if str(file).endswith("zip"):
- if str(file).endswith("csv.zip"):
- copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path).replace("csv.zip", 'csv.gz'))
- else:
- desc_path = file.replace(self.pathsAndTable.read_dir, to_path)
- unzip(file, get_desc_path(desc_path))
- self.pathsAndTable.has_zip = True
- elif str(file).endswith("rar"):
- desc_path = file.replace(self.pathsAndTable.read_dir, to_path)
- is_success, e = unrar(file, get_desc_path(desc_path))
- self.pathsAndTable.has_zip = True
- if not is_success:
- trans_print(traceback.format_exc())
- pass
- else:
- copy_to_new(file, file.replace(self.pathsAndTable.read_dir, to_path))
- def remove_file_to_tmp_path(self):
- # 读取文件
- try:
- if path.isfile(self.pathsAndTable.read_dir):
- all_files = [self.pathsAndTable.read_dir]
- else:
- all_files = read_files(self.pathsAndTable.read_dir)
- # 最大取系统cpu的 三分之二
- split_count = get_available_cpu_count_with_percent(2 / 3)
- all_arrays = split_array(all_files, split_count)
- for index, arr in enumerate(all_arrays):
- pool_count = split_count if split_count < len(arr) else len(arr)
- with multiprocessing.Pool(pool_count) as pool:
- pool.starmap(self.get_and_remove, [(i,) for i in arr])
- update_trans_transfer_progress(self.pathsAndTable.id,
- round(5 + 15 * (index + 1) / len(all_arrays), 2),
- self.pathsAndTable.save_db)
- all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
- trans_print('读取文件数量:', len(all_files))
- except Exception as e:
- trans_print(traceback.format_exc())
- message = "读取文件列表错误:" + self.pathsAndTable.read_dir + ",系统返回错误:" + str(e)
- raise ValueError(message)
- return all_files
- def run(self):
- self.remove_file_to_tmp_path()
- update_trans_transfer_progress(self.pathsAndTable.id, 20,
- self.pathsAndTable.save_db)
|