ImportParam.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import datetime
  2. import json
  3. import os.path
  4. from trans.ExecParam import ExecParam
  5. from service.import_data_service import get_exec_group, get_exec_data
  6. from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
  7. from trans.common.UnzipAndRemove import UnzipAndRemove
  8. from utils.conf.read_conf import read_conf
  9. from utils.log.import_data_log import log_print
  10. class ImportParam(object):
  11. def __init__(self, data, env_conf, save_db=True):
  12. results = get_exec_group(data['id'])
  13. self.executor_history_id = data['id']
  14. self.executor_history_name = data['name']
  15. self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp")
  16. self.save_db = save_db
  17. if results:
  18. result = results[0]
  19. self.process_group_name = result['name']
  20. self.join_type = result['join_type']
  21. self.process_group_id = data['process_group_id']
  22. self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
  23. self.out_put_dir = data['out_put_dir']
  24. self.execotor_list = list()
  25. for exec_data in json.loads(data['input_dirs']):
  26. self.execotor_list.append(ExecParam(exec_data, self.tmp_root))
  27. else:
  28. log_print(f"通过ID: {data['id']}没有获取到执行器组")
  29. raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
  30. def run(self):
  31. total_begin = datetime.datetime.now()
  32. log_print(f'开始执行{self.process_group_name}')
  33. process_count = len(self.execotor_list)
  34. for index, exec_data in enumerate(self.execotor_list):
  35. log_print(f'开始执行{exec_data.process_executor}')
  36. begin = datetime.datetime.now()
  37. # 移动解压文件
  38. unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
  39. exec_data.path_param, self.save_db)
  40. unzip_and_remove.run()
  41. log_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
  42. datetime.datetime.now() - total_begin)
  43. # 读取文件
  44. begin = datetime.datetime.now()
  45. read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index,
  46. exec_data,
  47. self.out_put_dir if process_count == 1 else exec_data.path_param.get_process_tmp_path(),
  48. self.save_db)
  49. read_and_save.run()
  50. log_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  51. datetime.datetime.now() - total_begin)