SaveToDb.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import multiprocessing
  2. import traceback
  3. from os import path
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from service.plt_service import update_trans_transfer_progress
  6. from service.trans_service import save_file_to_db
  7. from utils.file.trans_methods import read_excel_files, split_array
  8. from utils.log.trans_log import trans_print
  9. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  10. class SaveToDb(object):
  11. def __init__(self, pathsAndTable: PathsAndTable, batch_count=100000):
  12. self.pathsAndTable = pathsAndTable
  13. self.batch_count = batch_count
  14. def mutiprocessing_to_save_db(self):
  15. # 开始保存到SQL文件
  16. self.pathsAndTable.delete_batch_db()
  17. all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
  18. wind_names = [str(path.basename(i)).replace(".csv", "") for i in all_saved_files]
  19. self.pathsAndTable.create_batch_db(wind_names)
  20. split_count = get_available_cpu_count_with_percent(percent=1 / 2)
  21. split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
  22. all_arrays = split_array(all_saved_files, split_count)
  23. try:
  24. for index, arr in enumerate(all_arrays):
  25. with multiprocessing.Pool(split_count) as pool:
  26. pool.starmap(save_file_to_db,
  27. [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in
  28. all_saved_files])
  29. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  30. round(70 + 29 * (index + 1) / len(all_arrays), 2),
  31. self.pathsAndTable.save_db)
  32. except Exception as e:
  33. trans_print(traceback.format_exc())
  34. message = "保存到数据库错误,系统返回错误:" + str(e)
  35. raise ValueError(message)
  36. def run(self):
  37. if self.pathsAndTable.save_db:
  38. self.mutiprocessing_to_save_db()
  39. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
  40. self.pathsAndTable.save_db)