SaveToDb.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import multiprocessing
  2. import os.path
  3. import traceback
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from service.trans_conf_service import update_trans_transfer_progress
  6. from service.trans_service import save_scada_file_to_db, save_file_to_db
  7. from utils.log.trans_log import info, error
  8. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  9. class SaveToDb(object):
  10. def __init__(self, pathsAndTable: PathsAndTable, update_files, batch_count=100000):
  11. self.pathsAndTable = pathsAndTable
  12. self.batch_count = batch_count
  13. self.update_files = update_files
  14. def mutiprocessing_to_save_db(self):
  15. # 开始保存到SQL文件
  16. all_saved_files = self.update_files
  17. # 映射到的文件保存到数据库
  18. all_saved_files = [i for i in all_saved_files if
  19. os.path.basename(i).split(".")[0] in self.pathsAndTable.wind_col_trans.keys()]
  20. if not all_saved_files:
  21. info("没有文件需要保存到数据库")
  22. return
  23. self.pathsAndTable.create_wind_farm_db()
  24. # 计算最佳进程数
  25. max_processes = get_available_cpu_count_with_percent(percent=2 / 3)
  26. max_processes = min(max_processes, len(all_saved_files), 10) # 限制最大进程数为10
  27. try:
  28. # 创建一个进程池处理所有文件
  29. with multiprocessing.Pool(max_processes) as pool:
  30. if self.pathsAndTable.read_type in ['minute', 'second']:
  31. # 准备参数
  32. params = [(self.pathsAndTable.get_table_name(), file,
  33. self.pathsAndTable.wind_col_trans[os.path.basename(file).split(".")[0]],
  34. os.path.basename(os.path.dirname(file)),
  35. self.batch_count, self.pathsAndTable.use_tidb) for file in all_saved_files]
  36. # 分批次处理并更新进度
  37. batch_size = max(1, len(params) // 10) # 最多10个批次
  38. for i in range(0, len(params), batch_size):
  39. batch_params = params[i:i + batch_size]
  40. pool.starmap(save_scada_file_to_db, batch_params)
  41. # 更新进度
  42. progress = 70 + 29 * (i + len(batch_params)) / len(params)
  43. update_trans_transfer_progress(self.pathsAndTable.id,
  44. round(progress, 2),
  45. self.pathsAndTable.save_db)
  46. else:
  47. # 准备参数
  48. params = [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in all_saved_files]
  49. # 分批次处理并更新进度
  50. batch_size = max(1, len(params) // 10) # 最多10个批次
  51. for i in range(0, len(params), batch_size):
  52. batch_params = params[i:i + batch_size]
  53. pool.starmap(save_file_to_db, batch_params)
  54. # 更新进度
  55. progress = 70 + 29 * (i + len(batch_params)) / len(params)
  56. update_trans_transfer_progress(self.pathsAndTable.id,
  57. round(progress, 2),
  58. self.pathsAndTable.save_db)
  59. except Exception as e:
  60. error(traceback.format_exc())
  61. message = "保存到数据库错误,系统返回错误:" + str(e)
  62. raise ValueError(message)
  63. def run(self):
  64. if self.pathsAndTable.save_db:
  65. self.mutiprocessing_to_save_db()
  66. update_trans_transfer_progress(self.pathsAndTable.id, 99,
  67. self.pathsAndTable.save_db)