# -*- coding: utf-8 -*- # @Time : 2024/6/7 # @Author : 魏志亮 from utils.db.ConnectMysqlPool import ConnectMysqlPool plt = ConnectMysqlPool("plt") def update_trans_status_running(batch_no, trans_type, schedule_exec=True): if schedule_exec: exec_sql = """ update data_transfer set trans_sys_status = 0 where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (batch_no, trans_type)) def update_trans_status_error(batch_no, trans_type, message="", save_db=True): if save_db: exec_sql = """ update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (message, batch_no, trans_type)) def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0, save_db=True): if save_db: exec_sql = """ update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now() where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type)) def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True): if save_db: exec_sql = """ update data_transfer set transfer_progress =%s,transfer_finish_time=now() where batch_code = %s and transfer_type = %s """ plt.execute(exec_sql, (transfer_progress, batch_no, trans_type)) # 获取执行的数据 def get_exec_data() -> dict: query_running_sql = "select 1 from data_transfer where trans_sys_status = 0" query_next_exdc_sql = """ SELECT t.*,a.field_name,b.batch_name FROM data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code inner join wind_field_batch b on t.batch_code = b.batch_code WHERE (t.trans_sys_status = -1 or ( t.trans_sys_status = 2 and t.transfer_state = 0)) AND t.transfer_addr != '' ORDER BY t.update_time LIMIT 1 """ data = plt.execute(query_running_sql) if data: return None else: data = plt.execute(query_next_exdc_sql) if type(data) == tuple: return {} return data[0] def get_all_wind(field_code): query_sql = "select engine_code,engine_name from wind_engine_group where field_code = %s and del_state = 0" dict_datas = plt.execute(query_sql, (field_code,)) result = dict() for data in dict_datas: result[str(data['engine_name'])] = str(data['engine_code']) return result def get_all_wind_company(): query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0" datas = plt.execute(query_sql) datas = [] if datas: return [v for data in datas for k, v in data.items()] else: return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场'] if __name__ == '__main__': print(get_all_wind('WOF01000002')) print(get_all_wind_company()) print(get_exec_data()) # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))