import multiprocessing import os import traceback import pandas as pd from service.import_data_service import update_transfer_progress from trans.ExecParam import ExecParam from utils.file.trans_methods import split_array, read_excel_files, read_file_to_df, find_header from utils.log.import_data_log import log_print from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count class Axis1DataImpl(object): def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True): self.id = id self.process_count = process_count self.now_count = now_count self.exec_param = exec_param self.save_db = save_db self.lock_map = dict() # for i in range(4): # self.lock_map[i] = multiprocessing.Manager().Lock() self.lock = multiprocessing.Manager().Lock() self.field_dict = multiprocessing.Manager().dict() def get_lock(self, split_col_value, col_name): boolean_first_time = False if split_col_value: filed_name = f'{split_col_value}_{col_name}' else: filed_name = col_name exists_count = len(self.field_dict.keys()) # if exists_count >= 4: # return self.lock if filed_name not in self.field_dict: boolean_first_time = True self.field_dict[filed_name] = len(self.field_dict.keys()) + 1 # return boolean_first_time, self.lock_map[self.field_dict[filed_name]] return boolean_first_time, self.lock def read_and_save_file(self, file_path): if self.exec_param.has_header: header = find_header(file_path, self.exec_param.use_cols) if header is None: raise Exception(f"文件{os.path.basename(file_path)}没有找到列名") else: header = None df = read_file_to_df(file_path, header=header) col_map = {'file_name': 'file_name'} if 'sheet_name' in df.columns: col_map['sheet_name'] = 'sheet_name' for col in df.columns: if col in self.exec_param.use_cols and col not in ['file_name', 'sheet_name']: col_map[col] = self.exec_param.mapping_cols[col] df = df.rename(columns=col_map) # 如果数据不包含索引列报错 for col in self.exec_param.index_cols: if col not in df.columns: log_print(f"{file_path}没有索引列{col}") raise Exception(f"{file_path}没有索引列{col}") if self.exec_param.split_cols: df['split_col'] = df[self.exec_param.split_cols].apply( lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]), axis=1) else: df['split_col'] = 'All' split_col = df['split_col'].unique() if len(split_col) >= 1000: log_print(f"{file_path}切割文件太多,大于等于1000个") raise Exception(f"{file_path}切割文件太多,大于等于1000个") general_fields = list(df.columns) general_fields.remove('split_col') general_fields.remove('file_name') if 'sheet_name' in general_fields: general_fields.remove('sheet_name') for col in self.exec_param.index_cols: general_fields.remove(col) for split_col_value in split_col: for col in general_fields: now_cols = [i for i in self.exec_param.index_cols] now_cols.append(col) now_df = df[df['split_col'] == split_col_value][now_cols] boolean_first_time, lock = self.get_lock(split_col_value, col) with lock: path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), split_col_value) os.makedirs(path, exist_ok=True) if boolean_first_time: now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, encoding='utf-8') else: now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, mode='a', header=False, encoding='utf-8') def read_merge_df_to_process(self, base_name): path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name) all_files = os.listdir(path) dfs = [pd.read_csv(os.path.join(path, i), encoding='utf-8', index_col=self.exec_param.index_cols) for i in all_files] df = pd.concat(dfs, axis=1) df.reset_index(inplace=True) df.to_csv(os.path.join(self.exec_param.path_param.get_process_tmp_path(), base_name + '.csv'), index=False, encoding='utf-8') def run(self): if len(self.exec_param.index_cols) == 0: log_print("合并表需要闯将索引列") log_print(traceback.format_exc()) raise Exception("合并表需要闯将索引列") all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path()) split_count = use_files_get_max_cpu_count(all_files) all_arrays = split_array(all_files, split_count) log_print("开始读取横向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count) for index, now_array in enumerate(all_arrays): with multiprocessing.Pool(split_count) as pool: pool.map(self.read_and_save_file, now_array) update_transfer_progress(self.id, round(20 + 50 * (index + 1) / len(all_arrays)), self.process_count, self.now_count, self.save_db) all_dirs = os.listdir(self.exec_param.path_param.get_merge_tmp_path()) dir_size = get_dir_size(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), all_dirs[0])) pool_count = max_file_size_get_max_cpu_count(dir_size) pool_count = pool_count if pool_count <= len(all_files) else len(all_files) with multiprocessing.Pool(pool_count) as pool: pool.map(self.read_merge_df_to_process, all_dirs) update_transfer_progress(self.id, 80, self.process_count, self.now_count, self.save_db)