123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import multiprocessing
- import os
- from trans.ExecParam import ExecParam
- from utils.file.trans_methods import read_excel_files, split_array, find_header, read_file_to_df
- from utils.log.import_data_log import log_print
- from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
- class Axis0DataImpl(object):
- def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
- self.id = id
- self.process_count = process_count
- self.now_count = now_count
- self.exec_param = exec_param
- self.save_db = save_db
- # self.lock_map = dict()
- # for i in range(1000):
- # self.lock_map[i] = multiprocessing.Manager().Lock()
- self.lock = multiprocessing.Manager().Lock()
- self.field_dict = multiprocessing.Manager().dict()
- def get_lock(self, split_col_value, col_name):
- boolean_first_time = False
- if split_col_value:
- filed_name = f'{split_col_value}_{col_name}'
- else:
- filed_name = col_name
- exists_count = len(self.field_dict.keys())
- # if exists_count >= 4:
- # return self.lock
- if filed_name not in self.field_dict:
- boolean_first_time = True
- self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
- # return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
- return boolean_first_time, self.lock
- def read_and_save_file(self, file_path):
- if self.exec_param.has_header:
- header = find_header(file_path, self.exec_param.use_cols)
- if header is None:
- raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
- else:
- header = None
- df = read_file_to_df(file_path, header=header, use_cols=self.exec_param.use_cols)
- col_map = {'file_name': 'file_name'}
- if 'sheet_name' in df.columns:
- col_map['sheet_name'] = 'sheet_name'
- pass
- df.rename(mapper=self.exec_param.mapping_cols, errors='ignore', inplace=True)
- if self.exec_param.split_cols:
- df['split_col'] = df[self.exec_param.split_cols].apply(
- lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
- axis=1)
- else:
- df['split_col'] = 'All'
- split_col = df['split_col'].unique()
- if len(split_col) >= 1000:
- log_print(f"{file_path}切割文件太多,大于等于1000个")
- raise Exception(f"{file_path}切割文件太多,大于等于1000个")
- general_fields = list(df.columns)
- general_fields.remove('split_col')
- general_fields.remove('file_name')
- if 'sheet_name' in general_fields:
- general_fields.remove('sheet_name')
- for col in self.exec_param.index_cols:
- general_fields.remove(col)
- for split_col_value in split_col:
- for col in general_fields:
- now_cols = self.exec_param.index_cols
- now_cols.append(col)
- now_df = df[df['split_col'] == split_col_value][now_cols]
- boolean_first_time, lock = self.get_lock(split_col_value, col)
- with lock:
- path = self.exec_param.path_param.get_process_tmp_path()
- os.makedirs(path, exist_ok=True)
- if boolean_first_time:
- now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, encoding='utf-8')
- else:
- now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, mode='a', header=False,
- encoding='utf-8')
- def run(self):
- all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
- split_count = use_files_get_max_cpu_count(all_files)
- all_arrays = split_array(all_files, split_count)
- log_print("开始读取竖向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count)
- with multiprocessing.Pool(split_count) as pool:
- pool.map(self.read_and_save_file, all_arrays)
|