ImportParam.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import shutil
  2. from datetime import datetime
  3. import json
  4. import multiprocessing
  5. import os.path
  6. import pandas as pd
  7. from service.import_data_service import get_exec_group, run_success, run_begin
  8. from trans.ExecParam import ExecParam
  9. from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
  10. from trans.common.UnzipAndRemove import UnzipAndRemove
  11. from utils.conf.read_conf import read_conf
  12. from utils.file.trans_methods import read_excel_files
  13. from utils.log.import_data_log import log_print
  14. class ImportParam(object):
  15. def __init__(self, data, env_conf, save_db=True):
  16. results = get_exec_group(data['id'])
  17. self.executor_history_id = data['id']
  18. self.executor_history_name = data['name']
  19. self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp")
  20. self.save_db = save_db
  21. if results:
  22. result = results[0]
  23. self.process_group_name = result['name']
  24. self.join_type = result['join_type']
  25. self.join_type_strs = [int(i) for i in result['join_type_str'].split(",")] if result[
  26. 'join_type_str'] else []
  27. self.process_group_id = data['process_group_id']
  28. self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
  29. self.out_put_dir = data['out_put_dir']
  30. self.execotor_list = list()
  31. for exec_data in json.loads(data['input_dirs']):
  32. self.execotor_list.append(ExecParam(exec_data, self.tmp_root, ))
  33. else:
  34. log_print(f"通过ID: {data['id']}没有获取到执行器组")
  35. raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
  36. def save_result_dir(self, base_name, file_dicts, index_cols, axis=0, join_type_strs=[]):
  37. log_print(f"开始执行执行器合并:{base_name}")
  38. begin = datetime.now()
  39. df = pd.DataFrame()
  40. if axis in [0, 1]:
  41. if index_cols:
  42. dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in file_dicts.items()]
  43. else:
  44. dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
  45. df = pd.concat(dfs, axis=axis)
  46. else:
  47. if len(set(self.execotor_list)) == 1:
  48. if index_cols:
  49. dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in
  50. file_dicts.items()]
  51. else:
  52. dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
  53. df = pd.concat(dfs, axis=axis)
  54. else:
  55. if index_cols:
  56. for join_type_index, join_type in enumerate(join_type_strs):
  57. df1 = pd.DataFrame()
  58. df2 = pd.DataFrame()
  59. if join_type_index in file_dicts.keys():
  60. df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8', index_col=index_cols)
  61. if join_type_index + 1 in file_dicts.keys():
  62. df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8', index_col=index_cols)
  63. df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
  64. else:
  65. for join_type_index, join_type in enumerate(join_type_strs):
  66. df1 = pd.DataFrame()
  67. df2 = pd.DataFrame()
  68. if join_type_index in file_dicts.keys():
  69. df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8')
  70. if join_type_index + 1 in file_dicts.keys():
  71. df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8')
  72. df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
  73. if index_cols:
  74. df.reset_index(inplace=True)
  75. df.drop_duplicates(subset=index_cols, inplace=True)
  76. df.sort_values(by=index_cols, inplace=True)
  77. df.to_csv(os.path.join(self.out_put_dir, f'{base_name}'), encoding='utf-8', index=False)
  78. log_print(f"{base_name}合并结束:{base_name},耗时:", datetime.now() - begin)
  79. def run(self):
  80. total_begin = datetime.now()
  81. log_print(f'开始执行{self.process_group_name}')
  82. log_print(f'开始清理临时目录{self.tmp_root}')
  83. shutil.rmtree(self.tmp_root, ignore_errors=True)
  84. log_print(f'清理临时目录{self.tmp_root}成功')
  85. run_begin(self.executor_history_id, self.save_db)
  86. process_count = len(self.execotor_list)
  87. for index, exec_data in enumerate(self.execotor_list):
  88. log_print(f'开始执行{exec_data.process_executor}')
  89. begin = datetime.now()
  90. # 移动解压文件
  91. unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
  92. exec_data.path_param, self.save_db)
  93. unzip_and_remove.run()
  94. log_print("解压移动文件结束:耗时:", datetime.now() - begin, "总耗时:", datetime.now() - total_begin)
  95. # 读取文件
  96. begin = datetime.now()
  97. read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index, exec_data, self.save_db)
  98. read_and_save.run()
  99. log_print(f"{exec_data.process_executor}保存数据到临时文件结束,耗时:", datetime.now() - begin,
  100. "总耗时:", datetime.now() - total_begin)
  101. log_print(f"{process_count}个执行器全部保存数据到临时文件结束,总耗时", datetime.now() - total_begin)
  102. log_print(f"开始执行执行器合并")
  103. begin = datetime.now()
  104. if self.join_type in [0, 1]:
  105. self.join_type_strs = [self.join_type] * len(self.execotor_list)
  106. file_map = dict()
  107. for index, exec_data in enumerate(self.execotor_list):
  108. all_files = read_excel_files(exec_data.path_param.get_process_tmp_path())
  109. for file in all_files:
  110. base_name = os.path.basename(file)
  111. if base_name in file_map:
  112. file_map[base_name][index] = file
  113. else:
  114. file_map[base_name] = {index: file}
  115. with multiprocessing.Pool(6) as pool:
  116. pool.starmap(self.save_result_dir,
  117. [(base_name, file_dicts, self.execotor_list[0].index_cols, self.join_type,
  118. self.join_type_strs) for base_name, file_dicts in file_map.items()])
  119. log_print("合并结束,耗时:", datetime.now() - begin, ",总耗时:", datetime.now() - total_begin)
  120. run_success(self.executor_history_id)
  121. log_print(f"{process_count}个执行器全部合并结束,总耗时", datetime.now() - total_begin)