SaveToDb.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import datetime
  2. import multiprocessing
  3. import traceback
  4. from etl.base.PathsAndTable import PathsAndTable
  5. from service.plt_service import update_trans_transfer_progress
  6. from service.trans_service import creat_table_and_add_partition, save_file_to_db
  7. from utils.file.trans_methods import read_excel_files, split_array
  8. from utils.log.trans_log import trans_print
  9. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  10. class SaveToDb(object):
  11. def __init__(self, pathsAndTable: PathsAndTable):
  12. self.pathsAndTable = pathsAndTable
  13. def mutiprocessing_to_save_db(self):
  14. # 开始保存到SQL文件
  15. self.pathsAndTable.delete_batch_db()
  16. trans_print("开始保存到数据库文件")
  17. all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
  18. creat_table_and_add_partition(self.pathsAndTable.get_table_name(), len(all_saved_files),
  19. self.pathsAndTable.read_type)
  20. split_count = get_available_cpu_count_with_percent(percent=1 / 2)
  21. split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
  22. all_arrays = split_array(all_saved_files, split_count)
  23. try:
  24. for index, arr in enumerate(all_arrays):
  25. with multiprocessing.Pool(split_count) as pool:
  26. pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
  27. all_saved_files])
  28. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  29. round(70 + 29 * (index + 1) / len(all_arrays), 2),
  30. self.pathsAndTable.save_db)
  31. except Exception as e:
  32. trans_print(traceback.format_exc())
  33. message = "保存到数据库错误,系统返回错误:" + str(e)
  34. raise ValueError(message)
  35. trans_print("结束保存到数据库文件")
  36. def run(self):
  37. trans_print("开始保存到数据库")
  38. begin = datetime.datetime.now()
  39. self.mutiprocessing_to_save_db()
  40. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
  41. self.pathsAndTable.save_db)
  42. trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)