# -*- coding: utf-8 -*- # @Time : 2024/6/7 # @Author : 魏志亮 import datetime from service.common_connect import plt def update_timeout_trans_data(): sql = """ UPDATE data_transfer SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2 WHERE ( (transfer_type = 'second' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 24) OR (transfer_type = 'minute' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6) OR (transfer_type = 'warn' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6) OR (transfer_type = 'fault' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6) ) AND trans_sys_status = 0 """ plt.execute(sql) def update_trans_status_running(batch_no, trans_type, schedule_exec=True): if schedule_exec: exec_sql = """ update data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='', engine_count =0,time_granularity=0,transfer_finish_time=null, data_min_time= null,data_max_time= null,transfer_data_count=null where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (batch_no, trans_type)) def update_trans_status_error(batch_no, trans_type, message="", save_db=True): if save_db: exec_sql = """ update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() where batch_code = %s and transfer_type = %s """ message = message if len(message) <= 200 else message[0:200] plt.execute(exec_sql, (message, batch_no, trans_type)) def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0, min_date=datetime.datetime.now(), max_date=datetime.datetime.now(), total_count=0, save_db=True): if save_db: if min_date is not None: exec_sql = """ update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(), data_min_time= %s,data_max_time= %s,transfer_data_count=%s where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, batch_no, trans_type)) else: exec_sql = """ update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now() where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type)) def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True): if save_db: exec_sql = """ update data_transfer set transfer_progress =%s where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (int(transfer_progress), batch_no, trans_type)) # 获取执行的数据 def get_batch_exec_data(run_count: int = 1) -> dict: query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0" query_next_exec_sql = """ SELECT t.*,a.field_name,b.batch_name FROM data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code inner join wind_field_batch b on t.batch_code = b.batch_code WHERE t.trans_sys_status in (-1,1,2) and t.transfer_state = 0 AND t.transfer_addr != '' ORDER BY t.update_time LIMIT 1 """ data = plt.execute(query_running_sql) now_count = int(data[0]['count']) if now_count >= run_count: return None else: data = plt.execute(query_next_exec_sql) if type(data) == tuple: return {} return data[0] def get_data_by_batch_no_and_type(batch_no, transfer_type): query_exec_sql = f""" SELECT t.*,a.field_name,b.batch_name FROM data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code inner join wind_field_batch b on t.batch_code = b.batch_code WHERE t.trans_sys_status in (-1,1,2) and t.transfer_state = 2 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}' AND t.transfer_addr != '' """ data = plt.execute(query_exec_sql) if type(data) == tuple: return None return data[0] ## 合并多个batch_使用 def get_hebing_data_by_batch_no_and_type(batch_no, transfer_type): query_exec_sql = f""" SELECT t.*,a.field_name,b.batch_name FROM data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code inner join wind_field_batch b on t.batch_code = b.batch_code WHERE t.trans_sys_status = 1 and t.transfer_state = 1 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}' AND t.transfer_addr != '' """ data = plt.execute(query_exec_sql) if type(data) == tuple: return None return data[0] def get_all_wind(field_code, need_rated_param=True): query_sql = """ SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed from wind_engine_group t LEFT JOIN wind_engine_mill a on t.mill_type_code = a.mill_type_code where t.field_code = %s and t.del_state = 0 """ dict_datas = plt.execute(query_sql, (field_code,)) wind_result = dict() power_result = dict() for data in dict_datas: wind_result[str(data['engine_name'])] = str(data['engine_code']) if need_rated_param: power_result[str(data['engine_code'])] = ( float(data['rated_capacity']), float(data['rated_cut_out_windspeed'])) return wind_result, power_result def get_all_wind_company(): query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0" datas = plt.execute(query_sql) if datas: return [v for data in datas for k, v in data.items()] else: return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场'] def get_base_wind_and_power(wind_turbine_number): query_sql = "SELECT rated_wind_speed,rated_capacity FROM wind_engine_group where engine_code = %s order by rated_wind_speed" dict_datas = plt.execute(query_sql, (wind_turbine_number,)) if type(dict_datas) == tuple: return None return dict_datas if __name__ == '__main__': # print(get_batch_exec_data(run_count=1)) # # print("**********************") # print(get_batch_exec_data(run_count=2)) # print("**********************") print(get_data_by_batch_no_and_type("test_", "second")) # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10)) begin = datetime.datetime.now() print(get_all_wind('WOF034900024'))