from db_server.base import import_db from utils.log.import_data_log import log_print db = import_db def get_exec_data(run_count=1): run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0') log_print(run_count_datas) running_count = int(run_count_datas[0]['count']) if running_count < run_count: run_datas = db.execute('select * from executor_history where exec_status = -1 limit 1') log_print(run_datas) if run_datas: return run_datas[0] else: log_print("没有可执行的任务") else: log_print("当前已达执行上限") return None def begin_run(id, save_db=True): if save_db: db.execute( 'update executor_history set exec_status = 0,begin_time=now() where id = %s', (id,)) def update_transfer_progress(id, number, process_count, now_count, save_db=True): number = round((now_count - 1) * 100 / process_count + number) log_print(f"{process_count}个任务,当前进度{now_count}, 当前比例:{number}") if save_db: db.execute('update executor_history set transfer_progress = %s where id = %s', (number, id)) def run_success(id, save_db=True): if save_db: db.execute('update executor_history set exec_status = 1,end_time=now() where id = %s', (id,)) def run_fail(id, error, save_db=True): if save_db: error = str(error)[0:200 if len(error) > 200 else len(error)] db.execute('update executor_history set exec_status = 2,err_info = %s,end_time=now() where id = %s', (error, id)) def get_exec_group(id): return db.execute("select * from process_group where id = %s", (id,)) def get_mapping_field(process_executor_id): return db.execute(""" SELECT t.standardized_name, t.data_name, a.is_cut_col, a.is_index FROM mapping_field t INNER JOIN base_template_field a ON t.template_filed_id = a.id WHERE t.process_executor_id = %s """, (process_executor_id,)) if __name__ == '__main__': get_exec_data(2)