plt_service.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/7
  3. # @Author : 魏志亮
  4. import datetime
  5. from service.common_connect import plt
  6. def update_timeout_trans_data():
  7. sql = """
  8. UPDATE data_transfer
  9. SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
  10. WHERE
  11. (
  12. (transfer_type = 'second' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 24)
  13. OR
  14. (transfer_type = 'minute' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)
  15. OR
  16. (transfer_type = 'warn' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)
  17. OR
  18. (transfer_type = 'fault' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)
  19. )
  20. AND trans_sys_status = 0
  21. """
  22. plt.execute(sql)
  23. def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
  24. if schedule_exec:
  25. exec_sql = """
  26. update data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
  27. engine_count =0,time_granularity=0,transfer_finish_time=null,
  28. data_min_time= null,data_max_time= null,transfer_data_count=null
  29. where batch_code = %s and transfer_type = %s
  30. """
  31. plt.execute(exec_sql, (batch_no, trans_type))
  32. def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
  33. if save_db:
  34. exec_sql = """
  35. update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now()
  36. where batch_code = %s and transfer_type = %s
  37. """
  38. message = message if len(message) <= 200 else message[0:200]
  39. plt.execute(exec_sql, (message, batch_no, trans_type))
  40. def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0,
  41. min_date=datetime.datetime.now(),
  42. max_date=datetime.datetime.now(),
  43. total_count=0, save_db=True):
  44. if save_db:
  45. if min_date is not None:
  46. exec_sql = """
  47. update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
  48. data_min_time= %s,data_max_time= %s,transfer_data_count=%s
  49. where batch_code = %s and transfer_type = %s
  50. """
  51. plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, batch_no, trans_type))
  52. else:
  53. exec_sql = """
  54. update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
  55. where batch_code = %s and transfer_type = %s
  56. """
  57. plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
  58. def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True):
  59. if save_db:
  60. exec_sql = """
  61. update data_transfer set transfer_progress =%s where batch_code = %s and transfer_type = %s
  62. """
  63. plt.execute(exec_sql, (int(transfer_progress), batch_no, trans_type))
  64. # 获取执行的数据
  65. def get_batch_exec_data(run_count: int = 1) -> dict:
  66. query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
  67. query_next_exec_sql = """
  68. SELECT
  69. t.*,a.field_name,b.batch_name
  70. FROM
  71. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  72. inner join wind_field_batch b on t.batch_code = b.batch_code
  73. WHERE
  74. t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
  75. AND t.transfer_addr != ''
  76. ORDER BY
  77. t.update_time
  78. LIMIT 1
  79. """
  80. data = plt.execute(query_running_sql)
  81. now_count = int(data[0]['count'])
  82. if now_count >= run_count:
  83. return None
  84. else:
  85. data = plt.execute(query_next_exec_sql)
  86. if type(data) == tuple:
  87. return {}
  88. return data[0]
  89. def get_data_by_batch_no_and_type(batch_no, transfer_type):
  90. query_exec_sql = f"""
  91. SELECT
  92. t.*,a.field_name,b.batch_name
  93. FROM
  94. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  95. inner join wind_field_batch b on t.batch_code = b.batch_code
  96. WHERE
  97. t.trans_sys_status in (-1,1,2) and t.transfer_state = 2 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}'
  98. AND t.transfer_addr != ''
  99. """
  100. data = plt.execute(query_exec_sql)
  101. if type(data) == tuple:
  102. return None
  103. return data[0]
  104. ## 合并多个batch_使用
  105. def get_hebing_data_by_batch_no_and_type(batch_no, transfer_type):
  106. query_exec_sql = f"""
  107. SELECT
  108. t.*,a.field_name,b.batch_name
  109. FROM
  110. data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
  111. inner join wind_field_batch b on t.batch_code = b.batch_code
  112. WHERE
  113. t.trans_sys_status = 1 and t.transfer_state = 1 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}'
  114. AND t.transfer_addr != ''
  115. """
  116. data = plt.execute(query_exec_sql)
  117. if type(data) == tuple:
  118. return None
  119. return data[0]
  120. def get_all_wind(field_code, need_rated_param=True):
  121. query_sql = """
  122. SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed
  123. from wind_engine_group t LEFT JOIN wind_engine_mill a on t.mill_type_code = a.mill_type_code
  124. where t.field_code = %s and t.del_state = 0
  125. """
  126. dict_datas = plt.execute(query_sql, (field_code,))
  127. wind_result = dict()
  128. power_result = dict()
  129. for data in dict_datas:
  130. wind_result[str(data['engine_name'])] = str(data['engine_code'])
  131. if need_rated_param:
  132. power_result[str(data['engine_code'])] = (
  133. float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
  134. return wind_result, power_result
  135. def get_all_wind_company():
  136. query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0"
  137. datas = plt.execute(query_sql)
  138. if datas:
  139. return [v for data in datas for k, v in data.items()]
  140. else:
  141. return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场',
  142. '虹梯官风电场', '长清风电场']
  143. def get_base_wind_and_power(wind_turbine_number):
  144. query_sql = "SELECT rated_wind_speed,rated_capacity FROM wind_engine_group where engine_code = %s order by rated_wind_speed"
  145. dict_datas = plt.execute(query_sql, (wind_turbine_number,))
  146. if type(dict_datas) == tuple:
  147. return None
  148. return dict_datas
  149. if __name__ == '__main__':
  150. # print(get_batch_exec_data(run_count=1))
  151. #
  152. # print("**********************")
  153. # print(get_batch_exec_data(run_count=2))
  154. # print("**********************")
  155. print(get_data_by_batch_no_and_type("test_", "second"))
  156. # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
  157. begin = datetime.datetime.now()
  158. print(get_all_wind('WOF034900024'))