123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import datetime
- from service.common_connect import plt
- def update_timeout_wave_trans_data():
- sql = """
- UPDATE wave_data_transfer
- SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
- WHERE TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6
- AND trans_sys_status = 0
- """
- plt.execute(sql)
- def update_wave_trans_status_running(id, schedule_exec=True):
- if schedule_exec:
- exec_sql = """
- update wave_data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
- engine_count =0,time_granularity=0,transfer_finish_time=null,
- data_min_time= null,data_max_time= null,transfer_data_count=null
- where id = %s
- """
- plt.execute(exec_sql, id)
- def update_wave_trans_status_error(id, message="", save_db=True):
- if save_db:
- exec_sql = """
- update wave_data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now()
- where id = %s
- """
- message = message if len(message) <= 200 else message[0:200]
- plt.execute(exec_sql, (message, id))
- def update_wave_trans_status_success(id, wind_count=0, time_granularity=0,
- min_date=datetime.datetime.now(),
- max_date=datetime.datetime.now(),
- total_count=0, save_db=True):
- if save_db:
- if min_date is not None:
- exec_sql = """
- update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
- data_min_time= %s,data_max_time= %s,transfer_data_count=%s
- where id = %s
- """
- plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id))
- else:
- exec_sql = """
- update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
- where id = %s
- """
- plt.execute(exec_sql, (wind_count, time_granularity, id))
- def update_wave_trans_transfer_progress(id, transfer_progress=0, save_db=True):
- if save_db:
- exec_sql = """
- update wave_data_transfer set transfer_progress = %s where id = %s
- """
- plt.execute(exec_sql, (int(transfer_progress), id))
- # 获取执行的数据
- def get_wave_exec_data(run_count: int = 1) -> dict:
- query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
- query_next_exec_sql = """
- SELECT
- t.*,a.field_name,b.batch_name
- FROM
- wave_data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
- inner join wind_field_batch b on t.batch_code = b.batch_code
- WHERE
- t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
- AND t.transfer_addr != ''
- ORDER BY
- t.update_time
- LIMIT 1
- """
- data = plt.execute(query_running_sql)
- now_count = int(data[0]['count'])
- if now_count >= run_count:
- return None
- else:
- data = plt.execute(query_next_exec_sql)
- if type(data) == tuple:
- return {}
- return data[0]
|