SaveToDb.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import multiprocessing
  2. import os.path
  3. import traceback
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from service.trans_conf_service import update_trans_transfer_progress
  6. from service.trans_service import save_partation_file_to_db, save_file_to_db
  7. from utils.file.trans_methods import split_array
  8. from utils.log.trans_log import trans_print
  9. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  10. class SaveToDb(object):
  11. def __init__(self, pathsAndTable: PathsAndTable, update_files, batch_count=100000):
  12. self.pathsAndTable = pathsAndTable
  13. self.batch_count = batch_count
  14. self.update_files = update_files
  15. def mutiprocessing_to_save_db(self):
  16. # 开始保存到SQL文件
  17. all_saved_files = self.update_files
  18. # 映射到的文件保存到数据库
  19. all_saved_files = [i for i in all_saved_files if
  20. os.path.basename(i).split(".")[0] in self.pathsAndTable.wind_col_trans.keys()]
  21. self.pathsAndTable.create_wind_farm_db()
  22. split_count = get_available_cpu_count_with_percent(percent=2 / 3)
  23. split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
  24. all_arrays = split_array(all_saved_files, split_count)
  25. try:
  26. for index, arr in enumerate(all_arrays):
  27. with multiprocessing.Pool(split_count) as pool:
  28. if self.pathsAndTable.read_type in ['minute', 'second']:
  29. pool.starmap(save_partation_file_to_db,
  30. [(self.pathsAndTable.get_table_name(), file,
  31. self.pathsAndTable.wind_col_trans[os.path.basename(file).split(".")[0]],
  32. os.path.basename(os.path.dirname(file)),
  33. self.batch_count) for file in arr])
  34. else:
  35. pool.starmap(save_file_to_db,
  36. [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in arr])
  37. update_trans_transfer_progress(self.pathsAndTable.id,
  38. round(70 + 29 * (index + 1) / len(all_arrays), 2),
  39. self.pathsAndTable.save_db)
  40. except Exception as e:
  41. trans_print(traceback.format_exc())
  42. message = "保存到数据库错误,系统返回错误:" + str(e)
  43. raise ValueError(message)
  44. def run(self):
  45. if self.pathsAndTable.save_db:
  46. self.mutiprocessing_to_save_db()
  47. update_trans_transfer_progress(self.pathsAndTable.id, 99,
  48. self.pathsAndTable.save_db)