wave_service.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import datetime
  2. from service.common_connect import plt
  3. def update_timeout_wave_trans_data():
  4. sql = """
  5. UPDATE wave_data_transfer
  6. SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
  7. WHERE TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6
  8. AND trans_sys_status = 0
  9. """
  10. plt.execute(sql)
  11. def update_wave_trans_status_running(id, schedule_exec=True):
  12. if schedule_exec:
  13. exec_sql = """
  14. update wave_data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
  15. engine_count =0,time_granularity=0,transfer_finish_time=null,
  16. data_min_time= null,data_max_time= null,transfer_data_count=null
  17. where id = %s
  18. """
  19. plt.execute(exec_sql, id)
  20. def update_wave_trans_status_error(id, message="", save_db=True):
  21. if save_db:
  22. exec_sql = """
  23. update wave_data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now()
  24. where id = %s
  25. """
  26. message = message if len(message) <= 200 else message[0:200]
  27. plt.execute(exec_sql, (message, id))
  28. def update_wave_trans_status_success(id, wind_count=0, time_granularity=0,
  29. min_date=datetime.datetime.now(),
  30. max_date=datetime.datetime.now(),
  31. total_count=0, save_db=True):
  32. if save_db:
  33. if min_date is not None:
  34. exec_sql = """
  35. update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
  36. data_min_time= %s,data_max_time= %s,transfer_data_count=%s
  37. where id = %s
  38. """
  39. plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id))
  40. else:
  41. exec_sql = """
  42. update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
  43. where id = %s
  44. """
  45. plt.execute(exec_sql, (wind_count, time_granularity, id))
  46. def update_wave_trans_transfer_progress(id, transfer_progress=0, save_db=True):
  47. if save_db:
  48. exec_sql = """
  49. update wave_data_transfer set transfer_progress = %s where id = %s
  50. """
  51. plt.execute(exec_sql, (int(transfer_progress), id))
  52. # 获取执行的数据
  53. def get_wave_exec_data(run_count: int = 1) -> dict:
  54. query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
  55. query_next_exec_sql = """
  56. SELECT
  57. t.*,a.field_name,b.batch_name
  58. FROM
  59. wave_data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  60. inner join wind_field_batch b on t.batch_code = b.batch_code
  61. WHERE
  62. t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
  63. AND t.transfer_addr != ''
  64. ORDER BY
  65. t.update_time
  66. LIMIT 1
  67. """
  68. data = plt.execute(query_running_sql)
  69. now_count = int(data[0]['count'])
  70. if now_count >= run_count:
  71. return None
  72. else:
  73. data = plt.execute(query_next_exec_sql)
  74. if type(data) == tuple:
  75. return {}
  76. return data[0]