1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- import datetime
- import multiprocessing
- import traceback
- from etl.base.PathsAndTable import PathsAndTable
- from service.plt_service import update_trans_transfer_progress
- from service.trans_service import creat_table_and_add_partition, save_file_to_db
- from utils.file.trans_methods import read_excel_files, split_array
- from utils.log.trans_log import trans_print
- from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
- class SaveToDb(object):
- def __init__(self, pathsAndTable: PathsAndTable):
- self.pathsAndTable = pathsAndTable
- def mutiprocessing_to_save_db(self):
- # 开始保存到SQL文件
- self.pathsAndTable.delete_batch_db()
- trans_print("开始保存到数据库文件")
- all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
- creat_table_and_add_partition(self.pathsAndTable.get_table_name(), len(all_saved_files),
- self.pathsAndTable.read_type)
- split_count = get_available_cpu_count_with_percent(percent=1 / 2)
- split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
- all_arrays = split_array(all_saved_files, split_count)
- try:
- for index, arr in enumerate(all_arrays):
- with multiprocessing.Pool(split_count) as pool:
- pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
- all_saved_files])
- update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
- round(70 + 29 * (index + 1) / len(all_arrays), 2),
- self.pathsAndTable.save_db)
- except Exception as e:
- trans_print(traceback.format_exc())
- message = "保存到数据库错误,系统返回错误:" + str(e)
- raise ValueError(message)
- trans_print("结束保存到数据库文件")
- def run(self):
- trans_print("开始保存到数据库")
- begin = datetime.datetime.now()
- self.mutiprocessing_to_save_db()
- update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
- self.pathsAndTable.save_db)
- trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)
|