import multiprocessing import os from trans.ExecParam import ExecParam from utils.file.trans_methods import read_excel_files, split_array, find_header, read_file_to_df from utils.log.import_data_log import log_print from utils.systeminfo.sysinfo import use_files_get_max_cpu_count class Axis0DataImpl(object): def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True): self.id = id self.process_count = process_count self.now_count = now_count self.exec_param = exec_param self.save_db = save_db # self.lock_map = dict() # for i in range(1000): # self.lock_map[i] = multiprocessing.Manager().Lock() self.lock = multiprocessing.Manager().Lock() self.field_dict = multiprocessing.Manager().dict() def get_lock(self, split_col_value, col_name): boolean_first_time = False if split_col_value: filed_name = f'{split_col_value}_{col_name}' else: filed_name = col_name exists_count = len(self.field_dict.keys()) # if exists_count >= 4: # return self.lock if filed_name not in self.field_dict: boolean_first_time = True self.field_dict[filed_name] = len(self.field_dict.keys()) + 1 # return boolean_first_time, self.lock_map[self.field_dict[filed_name]] return boolean_first_time, self.lock def read_and_save_file(self, file_path): if self.exec_param.has_header: header = find_header(file_path, self.exec_param.use_cols) if header is None: raise Exception(f"文件{os.path.basename(file_path)}没有找到列名") else: header = None df = read_file_to_df(file_path, header=header, use_cols=self.exec_param.use_cols) col_map = {'file_name': 'file_name'} if 'sheet_name' in df.columns: col_map['sheet_name'] = 'sheet_name' pass df.rename(mapper=self.exec_param.mapping_cols, errors='ignore', inplace=True) if self.exec_param.split_cols: df['split_col'] = df[self.exec_param.split_cols].apply( lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]), axis=1) else: df['split_col'] = 'All' split_col = df['split_col'].unique() if len(split_col) >= 1000: log_print(f"{file_path}切割文件太多,大于等于1000个") raise Exception(f"{file_path}切割文件太多,大于等于1000个") general_fields = list(df.columns) general_fields.remove('split_col') general_fields.remove('file_name') if 'sheet_name' in general_fields: general_fields.remove('sheet_name') for col in self.exec_param.index_cols: general_fields.remove(col) for split_col_value in split_col: for col in general_fields: now_cols = self.exec_param.index_cols now_cols.append(col) now_df = df[df['split_col'] == split_col_value][now_cols] boolean_first_time, lock = self.get_lock(split_col_value, col) with lock: path = self.exec_param.path_param.get_process_tmp_path() os.makedirs(path, exist_ok=True) if boolean_first_time: now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, encoding='utf-8') else: now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, mode='a', header=False, encoding='utf-8') def run(self): all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path()) split_count = use_files_get_max_cpu_count(all_files) all_arrays = split_array(all_files, split_count) log_print("开始读取竖向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count) with multiprocessing.Pool(split_count) as pool: pool.map(self.read_and_save_file, all_arrays)