123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import multiprocessing
- import os
- import traceback
- import pandas as pd
- from service.import_data_service import update_transfer_progress
- from trans.ExecParam import ExecParam
- from utils.file.trans_methods import split_array, read_excel_files, read_file_to_df, find_header
- from utils.log.import_data_log import log_print
- from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
- class Axis1DataImpl(object):
- def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
- self.id = id
- self.process_count = process_count
- self.now_count = now_count
- self.exec_param = exec_param
- self.save_db = save_db
- self.lock_map = dict()
- for i in range(1000):
- self.lock_map[i] = multiprocessing.Manager().Lock()
- self.lock = multiprocessing.Manager().Lock()
- self.field_dict = multiprocessing.Manager().dict()
- def get_lock(self, split_col_value, col_name):
- boolean_first_time = False
- if split_col_value:
- filed_name = f'{split_col_value}_{col_name}'
- else:
- filed_name = col_name
- exists_count = len(self.field_dict.keys())
- if exists_count >= 1000:
- return self.lock
- if filed_name not in self.field_dict:
- boolean_first_time = True
- self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
- return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
- def read_and_save_file(self, file_path):
- if self.exec_param.has_header:
- header = find_header(file_path, self.exec_param.use_cols)
- if header is None:
- raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
- else:
- header = None
- df = read_file_to_df(file_path, header=header)
- col_map = {'file_name': 'file_name'}
- if 'sheet_name' in df.columns:
- col_map['sheet_name'] = 'sheet_name'
- for col in df.columns:
- if col in self.exec_param.use_cols and col not in ['file_name', 'sheet_name']:
- col_map[col] = self.exec_param.mapping_cols[col]
- df = df.rename(columns=col_map)
- # 如果数据不包含索引列报错
- for col in self.exec_param.index_cols:
- if col not in df.columns:
- log_print(f"{file_path}没有索引列{col}")
- raise Exception(f"{file_path}没有索引列{col}")
- if self.exec_param.split_cols:
- df['split_col'] = df[self.exec_param.split_cols].apply(
- lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
- axis=1)
- else:
- df['split_col'] = 'All'
- split_col = df['split_col'].unique()
- if len(split_col) >= 1000:
- log_print(f"{file_path}切割文件太多,大于等于1000个")
- raise Exception(f"{file_path}切割文件太多,大于等于1000个")
- general_fields = list(df.columns)
- general_fields.remove('split_col')
- for col in self.exec_param.index_cols:
- general_fields.remove(col)
- for split_col_value in split_col:
- for col in general_fields:
- now_cols = self.exec_param.index_cols
- now_cols.append(col)
- now_df = df[df['split_col'] == split_col_value][now_cols]
- boolean_first_time, lock = self.get_lock(split_col_value, col)
- with lock:
- path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), split_col_value)
- os.makedirs(path, exist_ok=True)
- if boolean_first_time:
- now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, encoding='utf-8')
- else:
- now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, mode='a', header=False,
- encoding='utf-8')
- def read_merge_df_to_process(self, base_name):
- all_files = os.listdir(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name))
- dfs = [pd.read_csv(i, encoding='utf-8', index_col=self.exec_param.index_cols) for i in all_files]
- df = pd.concat(dfs, axis=1, ignore_index=True)
- df.reset_index(inplace=True)
- df.to_csv(os.path.join(self.exec_param.path_param.get_process_tmp_path(), base_name + '.csv'), index=False,
- encoding='utf-8')
- def run(self):
- if len(self.exec_param.index_cols) == 0:
- log_print("合并表需要闯将索引列")
- log_print(traceback.format_exc())
- raise Exception("合并表需要闯将索引列")
- all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
- split_count = use_files_get_max_cpu_count(all_files)
- all_arrays = split_array(all_files, split_count)
- log_print("开始读取文件,文件总数:", len(all_files), ",文件分片数:", split_count)
- for index, now_array in enumerate(all_arrays):
- with multiprocessing.Pool(split_count) as pool:
- pool.starmap(self.read_and_save_file, now_array)
- update_transfer_progress(self.id, round(20 + 50 * (index + 1) / len(all_arrays)), self.process_count,
- self.now_count, self.save_db)
- all_dirs = os.listdir(self.exec_param.path_param.get_merge_tmp_path())
- dir_size = get_dir_size(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), all_dirs[0]))
- pool_count = max_file_size_get_max_cpu_count(dir_size)
- pool_count = pool_count if pool_count <= len(all_files) else len(all_files)
- with multiprocessing.Pool(pool_count) as pool:
- pool.map(self.read_merge_df_to_process, all_dirs)
- update_transfer_progress(self.id, 80, self.process_count, self.now_count, self.save_db)
|