1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import datetime
- import json
- import os.path
- from trans.ExecParam import ExecParam
- from service.import_data_service import get_exec_group, get_exec_data
- from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
- from trans.common.UnzipAndRemove import UnzipAndRemove
- from utils.conf.read_conf import read_conf
- from utils.log.import_data_log import log_print
- class ImportParam(object):
- def __init__(self, data, env_conf, save_db=True):
- results = get_exec_group(data['id'])
- self.executor_history_id = data['id']
- self.executor_history_name = data['name']
- self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp")
- self.save_db = save_db
- if results:
- result = results[0]
- self.process_group_name = result['name']
- self.join_type = result['join_type']
- self.process_group_id = data['process_group_id']
- self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
- self.out_put_dir = data['out_put_dir']
- self.execotor_list = list()
- for exec_data in json.loads(data['input_dirs']):
- self.execotor_list.append(ExecParam(exec_data, self.tmp_root))
- else:
- log_print(f"通过ID: {data['id']}没有获取到执行器组")
- raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
- def run(self):
- total_begin = datetime.datetime.now()
- log_print(f'开始执行{self.process_group_name}')
- process_count = len(self.execotor_list)
- for index, exec_data in enumerate(self.execotor_list):
- log_print(f'开始执行{exec_data.process_executor}')
- begin = datetime.datetime.now()
- # 移动解压文件
- unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
- exec_data.path_param, self.save_db)
- unzip_and_remove.run()
- log_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- # 读取文件
- begin = datetime.datetime.now()
- read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index,
- exec_data,
- self.out_put_dir if process_count == 1 else exec_data.path_param.get_process_tmp_path(),
- self.save_db)
- read_and_save.run()
- log_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
|