import datetime import json import os.path from trans.ExecParam import ExecParam from service.import_data_service import get_exec_group, get_exec_data from trans.common.ReadAndSaveTmp import ReadAndSaveTmp from trans.common.UnzipAndRemove import UnzipAndRemove from utils.conf.read_conf import read_conf from utils.log.import_data_log import log_print class ImportParam(object): def __init__(self, data, env_conf, save_db=True): results = get_exec_group(data['id']) self.executor_history_id = data['id'] self.executor_history_name = data['name'] self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp") self.save_db = save_db if results: result = results[0] self.process_group_name = result['name'] self.join_type = result['join_type'] self.process_group_id = data['process_group_id'] self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}') self.out_put_dir = data['out_put_dir'] self.execotor_list = list() for exec_data in json.loads(data['input_dirs']): self.execotor_list.append(ExecParam(exec_data, self.tmp_root)) else: log_print(f"通过ID: {data['id']}没有获取到执行器组") raise Exception(f"通过ID: {data['id']}没有获取到执行器组") def run(self): total_begin = datetime.datetime.now() log_print(f'开始执行{self.process_group_name}') process_count = len(self.execotor_list) for index, exec_data in enumerate(self.execotor_list): log_print(f'开始执行{exec_data.process_executor}') begin = datetime.datetime.now() # 移动解压文件 unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index, exec_data.path_param, self.save_db) unzip_and_remove.run() log_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) # 读取文件 begin = datetime.datetime.now() read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index, exec_data, self.out_put_dir if process_count == 1 else exec_data.path_param.get_process_tmp_path(), self.save_db) read_and_save.run() log_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin)