123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import shutil
- from datetime import datetime
- import json
- import multiprocessing
- import os.path
- import pandas as pd
- from service.import_data_service import get_exec_group, run_success, run_begin
- from trans.ExecParam import ExecParam
- from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
- from trans.common.UnzipAndRemove import UnzipAndRemove
- from utils.conf.read_conf import read_conf
- from utils.file.trans_methods import read_excel_files
- from utils.log.import_data_log import log_print
- class ImportParam(object):
- def __init__(self, data, env_conf, save_db=True):
- results = get_exec_group(data['id'])
- self.executor_history_id = data['id']
- self.executor_history_name = data['name']
- self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp")
- self.save_db = save_db
- if results:
- result = results[0]
- self.process_group_name = result['name']
- self.join_type = result['join_type']
- self.join_type_strs = [int(i) for i in result['join_type_str'].split(",")] if result[
- 'join_type_str'] else []
- self.process_group_id = data['process_group_id']
- self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
- self.out_put_dir = data['out_put_dir']
- self.execotor_list = list()
- for exec_data in json.loads(data['input_dirs']):
- self.execotor_list.append(ExecParam(exec_data, self.tmp_root, ))
- else:
- log_print(f"通过ID: {data['id']}没有获取到执行器组")
- raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
- def save_result_dir(self, base_name, file_dicts, index_cols, axis=0, join_type_strs=[]):
- log_print(f"开始执行执行器合并:{base_name}")
- begin = datetime.now()
- df = pd.DataFrame()
- if axis in [0, 1]:
- if index_cols:
- dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in file_dicts.items()]
- else:
- dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
- df = pd.concat(dfs, axis=axis)
- else:
- if len(set(self.execotor_list)) == 1:
- if index_cols:
- dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in
- file_dicts.items()]
- else:
- dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
- df = pd.concat(dfs, axis=axis)
- else:
- if index_cols:
- for join_type_index, join_type in enumerate(join_type_strs):
- df1 = pd.DataFrame()
- df2 = pd.DataFrame()
- if join_type_index in file_dicts.keys():
- df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8', index_col=index_cols)
- if join_type_index + 1 in file_dicts.keys():
- df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8', index_col=index_cols)
- df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
- else:
- for join_type_index, join_type in enumerate(join_type_strs):
- df1 = pd.DataFrame()
- df2 = pd.DataFrame()
- if join_type_index in file_dicts.keys():
- df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8')
- if join_type_index + 1 in file_dicts.keys():
- df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8')
- df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
- if index_cols:
- df.reset_index(inplace=True)
- df.drop_duplicates(subset=index_cols, inplace=True)
- df.sort_values(by=index_cols, inplace=True)
- df.to_csv(os.path.join(self.out_put_dir, f'{base_name}'), encoding='utf-8', index=False)
- log_print(f"{base_name}合并结束:{base_name},耗时:", datetime.now() - begin)
- def run(self):
- total_begin = datetime.now()
- log_print(f'开始执行{self.process_group_name}')
- log_print(f'开始清理临时目录{self.tmp_root}')
- shutil.rmtree(self.tmp_root, ignore_errors=True)
- log_print(f'清理临时目录{self.tmp_root}成功')
- run_begin(self.executor_history_id, self.save_db)
- process_count = len(self.execotor_list)
- for index, exec_data in enumerate(self.execotor_list):
- log_print(f'开始执行{exec_data.process_executor}')
- begin = datetime.now()
- # 移动解压文件
- unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
- exec_data.path_param, self.save_db)
- unzip_and_remove.run()
- log_print("解压移动文件结束:耗时:", datetime.now() - begin, "总耗时:", datetime.now() - total_begin)
- # 读取文件
- begin = datetime.now()
- read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index, exec_data, self.save_db)
- read_and_save.run()
- log_print(f"{exec_data.process_executor}保存数据到临时文件结束,耗时:", datetime.now() - begin,
- "总耗时:", datetime.now() - total_begin)
- log_print(f"{process_count}个执行器全部保存数据到临时文件结束,总耗时", datetime.now() - total_begin)
- log_print(f"开始执行执行器合并")
- begin = datetime.now()
- if self.join_type in [0, 1]:
- self.join_type_strs = [self.join_type] * len(self.execotor_list)
- file_map = dict()
- for index, exec_data in enumerate(self.execotor_list):
- all_files = read_excel_files(exec_data.path_param.get_process_tmp_path())
- for file in all_files:
- base_name = os.path.basename(file)
- if base_name in file_map:
- file_map[base_name][index] = file
- else:
- file_map[base_name] = {index: file}
- with multiprocessing.Pool(6) as pool:
- pool.starmap(self.save_result_dir,
- [(base_name, file_dicts, self.execotor_list[0].index_cols, self.join_type,
- self.join_type_strs) for base_name, file_dicts in file_map.items()])
- log_print("合并结束,耗时:", datetime.now() - begin, ",总耗时:", datetime.now() - total_begin)
- run_success(self.executor_history_id)
- log_print(f"{process_count}个执行器全部合并结束,总耗时", datetime.now() - total_begin)
|