plt_service.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. from utils.db.ConnectMysqlPool import ConnectMysqlPool
  5. plt = ConnectMysqlPool("plt")
  6. def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
  7. if schedule_exec:
  8. exec_sql = """
  9. update data_transfer set trans_sys_status = 0
  10. where batch_code = %s and transfer_type = %s
  11. """
  12. plt.execute(exec_sql, (batch_no, trans_type))
  13. def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
  14. if save_db:
  15. exec_sql = """
  16. update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now()
  17. where batch_code = %s and transfer_type = %s
  18. """
  19. plt.execute(exec_sql, (message, batch_no, trans_type))
  20. def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0, save_db=True):
  21. if save_db:
  22. exec_sql = """
  23. update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
  24. where batch_code = %s and transfer_type = %s
  25. """
  26. plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
  27. def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True):
  28. if save_db:
  29. exec_sql = """
  30. update data_transfer set transfer_progress =%s where batch_code = %s and transfer_type = %s
  31. """
  32. plt.execute(exec_sql, (transfer_progress, batch_no, trans_type))
  33. # 获取执行的数据
  34. def get_exec_data() -> dict:
  35. query_running_sql = "select 1 from data_transfer where trans_sys_status = 0"
  36. query_next_exdc_sql = """
  37. SELECT
  38. t.*,a.field_name,b.batch_name
  39. FROM
  40. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  41. inner join wind_field_batch b on t.batch_code = b.batch_code
  42. WHERE
  43. (t.trans_sys_status = -1 or ( t.trans_sys_status = 2 and t.transfer_state = 0))
  44. AND t.transfer_addr != ''
  45. ORDER BY
  46. t.update_time
  47. LIMIT 1
  48. """
  49. data = plt.execute(query_running_sql)
  50. if data:
  51. return None
  52. else:
  53. data = plt.execute(query_next_exdc_sql)
  54. if type(data) == tuple:
  55. return {}
  56. return data[0]
  57. def get_all_wind(field_code):
  58. query_sql = "select engine_code,engine_name from wind_engine_group where field_code = %s and del_state = 0"
  59. dict_datas = plt.execute(query_sql, (field_code,))
  60. result = dict()
  61. for data in dict_datas:
  62. result[str(data['engine_name'])] = str(data['engine_code'])
  63. return result
  64. def get_all_wind_company():
  65. query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0"
  66. datas = plt.execute(query_sql)
  67. datas = []
  68. if datas:
  69. return [v for data in datas for k, v in data.items()]
  70. else:
  71. return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场']
  72. if __name__ == '__main__':
  73. print(get_all_wind('WOF01000002'))
  74. print(get_all_wind_company())
  75. print(get_exec_data())
  76. # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))