import multiprocessing import traceback from os import path from etl.common.PathsAndTable import PathsAndTable from service.plt_service import update_trans_transfer_progress from service.trans_service import save_file_to_db from utils.file.trans_methods import read_excel_files, split_array from utils.log.trans_log import trans_print from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class SaveToDb(object): def __init__(self, pathsAndTable: PathsAndTable, batch_count=100000): self.pathsAndTable = pathsAndTable self.batch_count = batch_count def mutiprocessing_to_save_db(self): # 开始保存到SQL文件 self.pathsAndTable.delete_batch_db() all_saved_files = read_excel_files(self.pathsAndTable.get_save_path()) wind_names = [str(path.basename(i)).replace(".csv", "") for i in all_saved_files] self.pathsAndTable.create_batch_db(wind_names) split_count = get_available_cpu_count_with_percent(percent=1 / 2) split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files) all_arrays = split_array(all_saved_files, split_count) try: for index, arr in enumerate(all_arrays): with multiprocessing.Pool(split_count) as pool: pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in all_saved_files]) update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, round(70 + 29 * (index + 1) / len(all_arrays), 2), self.pathsAndTable.save_db) except Exception as e: trans_print(traceback.format_exc()) message = "保存到数据库错误,系统返回错误:" + str(e) raise ValueError(message) def run(self): if self.pathsAndTable.save_db: self.mutiprocessing_to_save_db() update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99, self.pathsAndTable.save_db)