SaveToDb.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. from etl.base.PathsAndTable import PathsAndTable
  6. from service.plt_service import update_trans_transfer_progress
  7. from service.trans_service import save_file_to_db
  8. from utils.file.trans_methods import read_excel_files, split_array
  9. from utils.log.trans_log import trans_print
  10. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  11. class SaveToDb(object):
  12. def __init__(self, pathsAndTable: PathsAndTable):
  13. self.pathsAndTable = pathsAndTable
  14. def mutiprocessing_to_save_db(self):
  15. # 开始保存到SQL文件
  16. self.pathsAndTable.delete_batch_db()
  17. trans_print("开始保存到数据库文件")
  18. all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
  19. wind_names = [str(os.path.basename(i)).replace(".csv", "") for i in all_saved_files]
  20. # creat_table_and_add_partition(self.pathsAndTable.get_table_name(), wind_names,
  21. # self.pathsAndTable.read_type)
  22. self.pathsAndTable.create_batch_db(wind_names)
  23. split_count = get_available_cpu_count_with_percent(percent=1 / 2)
  24. split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
  25. all_arrays = split_array(all_saved_files, split_count)
  26. try:
  27. for index, arr in enumerate(all_arrays):
  28. with multiprocessing.Pool(split_count) as pool:
  29. pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
  30. all_saved_files])
  31. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  32. round(70 + 29 * (index + 1) / len(all_arrays), 2),
  33. self.pathsAndTable.save_db)
  34. except Exception as e:
  35. trans_print(traceback.format_exc())
  36. message = "保存到数据库错误,系统返回错误:" + str(e)
  37. raise ValueError(message)
  38. trans_print("结束保存到数据库文件")
  39. def run(self):
  40. trans_print("开始保存到数据库")
  41. begin = datetime.datetime.now()
  42. self.mutiprocessing_to_save_db()
  43. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
  44. self.pathsAndTable.save_db)
  45. trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)