plt_service.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import datetime
  5. from utils.db.ConnectMysql import ConnectMysql
  6. plt = ConnectMysql("plt")
  7. def update_timeout_trans_data():
  8. sql = """
  9. UPDATE data_transfer
  10. SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
  11. WHERE
  12. (
  13. (transfer_type = 'second' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 24)
  14. OR
  15. (transfer_type = 'minute' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)
  16. )
  17. AND trans_sys_status = 0
  18. """
  19. plt.execute(sql)
  20. def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
  21. if schedule_exec:
  22. exec_sql = """
  23. update data_transfer set trans_sys_status = 0 ,transfer_start_time = now()
  24. where batch_code = %s and transfer_type = %s
  25. """
  26. plt.execute(exec_sql, (batch_no, trans_type))
  27. def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
  28. if save_db:
  29. exec_sql = """
  30. update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now()
  31. where batch_code = %s and transfer_type = %s
  32. """
  33. message = message if len(message) <= 200 else message[0:200]
  34. plt.execute(exec_sql, (message, batch_no, trans_type))
  35. def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0,
  36. min_date=datetime.datetime.now(),
  37. max_date=datetime.datetime.now(),
  38. total_count=0, save_db=True):
  39. if save_db:
  40. if min_date is not None:
  41. exec_sql = """
  42. update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
  43. data_min_time= %s,data_max_time= %s,transfer_data_count=%s
  44. where batch_code = %s and transfer_type = %s
  45. """
  46. plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, batch_no, trans_type))
  47. else:
  48. exec_sql = """
  49. update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
  50. where batch_code = %s and transfer_type = %s
  51. """
  52. plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
  53. def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True):
  54. if save_db:
  55. exec_sql = """
  56. update data_transfer set transfer_progress =%s where batch_code = %s and transfer_type = %s
  57. """
  58. plt.execute(exec_sql, (transfer_progress, batch_no, trans_type))
  59. # 获取执行的数据
  60. def get_exec_data(run_count: int = 1) -> dict:
  61. query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
  62. query_next_exdc_sql = """
  63. SELECT
  64. t.*,a.field_name,b.batch_name
  65. FROM
  66. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  67. inner join wind_field_batch b on t.batch_code = b.batch_code
  68. WHERE
  69. ((t.trans_sys_status = -1 and t.transfer_state = 0) or ( t.trans_sys_status in (1,2) and t.transfer_state = 0))
  70. AND t.transfer_addr != ''
  71. ORDER BY
  72. t.update_time
  73. LIMIT 1
  74. """
  75. data = plt.execute(query_running_sql)
  76. now_count = int(data[0]['count'])
  77. if now_count >= run_count:
  78. return None
  79. else:
  80. data = plt.execute(query_next_exdc_sql)
  81. if type(data) == tuple:
  82. return {}
  83. return data[0]
  84. def get_all_wind(field_code):
  85. query_sql = "select engine_code,engine_name from wind_engine_group where field_code = %s and del_state = 0"
  86. dict_datas = plt.execute(query_sql, (field_code,))
  87. result = dict()
  88. for data in dict_datas:
  89. result[str(data['engine_name'])] = str(data['engine_code'])
  90. return result
  91. def get_all_wind_company():
  92. query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0"
  93. datas = plt.execute(query_sql)
  94. if datas:
  95. return [v for data in datas for k, v in data.items()]
  96. else:
  97. return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场']
  98. if __name__ == '__main__':
  99. print(get_exec_data(run_count=1))
  100. print("**********************")
  101. print(get_exec_data(run_count=2))
  102. # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))