import_data_service.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from db_server.base import import_db
  2. from utils.log.import_data_log import log_print
  3. db = import_db
  4. def get_exec_data(run_count=1):
  5. run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0')
  6. log_print(run_count_datas)
  7. running_count = int(run_count_datas[0]['count'])
  8. if running_count < run_count:
  9. run_datas = db.execute('select * from executor_history where exec_status = -1 limit 1')
  10. log_print(run_datas)
  11. if run_datas:
  12. return run_datas[0]
  13. else:
  14. log_print("没有可执行的任务")
  15. else:
  16. log_print("当前已达执行上限")
  17. return None
  18. def begin_run(id, save_db=True):
  19. if save_db:
  20. db.execute(
  21. 'update executor_history set exec_status = 0,begin_time=now() where id = %s', (id,))
  22. def update_transfer_progress(id, number, process_count, now_count, save_db=True):
  23. number = round((now_count - 1) * 100 / process_count + number)
  24. log_print(f"{process_count}个任务,当前进度{now_count}, 当前比例:{number}")
  25. if save_db:
  26. db.execute('update executor_history set transfer_progress = %s where id = %s', (number, id))
  27. def run_success(id, save_db=True):
  28. if save_db:
  29. db.execute('update executor_history set exec_status = 1,end_time=now() where id = %s', (id,))
  30. def run_fail(id, error, save_db=True):
  31. if save_db:
  32. error = str(error)[0:200 if len(error) > 200 else len(error)]
  33. db.execute('update executor_history set exec_status = 2,err_info = %s,end_time=now() where id = %s',
  34. (error, id))
  35. def get_exec_group(id):
  36. return db.execute("select * from process_group where id = %s", (id,))
  37. def get_mapping_field(process_executor_id):
  38. return db.execute("""
  39. SELECT
  40. t.standardized_name,
  41. t.data_name,
  42. a.is_cut_col,
  43. a.is_index
  44. FROM
  45. mapping_field t
  46. INNER JOIN base_template_field a ON t.template_filed_id = a.id
  47. WHERE
  48. t.process_executor_id = %s
  49. """, (process_executor_id,))
  50. if __name__ == '__main__':
  51. get_exec_data(2)