123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from db_server.base import import_db
- from utils.log.import_data_log import log_print
- db = import_db
- def get_exec_data(run_count=1):
- run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0')
- log_print(run_count_datas)
- running_count = int(run_count_datas[0]['count'])
- if running_count < run_count:
- run_datas = db.execute('select * from executor_history where exec_status = -1 limit 1')
- log_print(run_datas)
- if run_datas:
- return run_datas[0]
- else:
- log_print("没有可执行的任务")
- else:
- log_print("当前已达执行上限")
- return None
- def begin_run(id, save_db=True):
- if save_db:
- db.execute(
- 'update executor_history set exec_status = 0,begin_time=now() where id = %s', (id,))
- def update_transfer_progress(id, number, process_count, now_count, save_db=True):
- number = round((now_count - 1) * 100 / process_count + number)
- log_print(f"{process_count}个任务,当前进度{now_count}, 当前比例:{number}")
- if save_db:
- db.execute('update executor_history set transfer_progress = %s where id = %s', (number, id))
- def run_success(id, save_db=True):
- if save_db:
- db.execute('update executor_history set exec_status = 1,end_time=now() where id = %s', (id,))
- def run_fail(id, error, save_db=True):
- if save_db:
- error = str(error)[0:200 if len(error) > 200 else len(error)]
- db.execute('update executor_history set exec_status = 2,err_info = %s,end_time=now() where id = %s',
- (error, id))
- def get_exec_group(id):
- return db.execute("select * from process_group where id = %s", (id,))
- def get_mapping_field(process_executor_id):
- return db.execute("""
- SELECT
- t.standardized_name,
- t.data_name,
- a.is_cut_col,
- a.is_index
- FROM
- mapping_field t
- INNER JOIN base_template_field a ON t.template_filed_id = a.id
- WHERE
- t.process_executor_id = %s
- """, (process_executor_id,))
- if __name__ == '__main__':
- get_exec_data(2)
|