# -*- coding: utf-8 -*- # @Time : 2024/6/6 # @Author : 魏志亮 from apscheduler.executors.pool import ThreadPoolExecutor from flask import Flask, request from flask_apscheduler import APScheduler from flask_restx import Api, Resource, fields from schedule_service import run_local from flask_executor import Executor app = Flask(__name__) executor = Executor(app) api = Api(app, version='1.0', title='Transfer API', description='Transfer API') localExec = api.model('LocalExecModal', { 'step': fields.Integer(default=0, description='开始步骤 0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '), 'end': fields.Integer(default=3, description='结束步骤 0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '), 'batch_no': fields.String(default='批次号', description='批次号'), 'transfer_type': fields.String(default='minute', description='传输类型'), 'transfer_file_addr': fields.String(default='/test', description='传输文件地址'), 'field_name': fields.String(default='风场名称', description='风场名称'), 'field_code': fields.String(default="风场编号", description="风场编号"), 'save_db': fields.Boolean(default=False, description='是否保存到数据库') }) @api.route('/local_exce') class LocalExec(Resource): @api.expect(localExec) def post(self): def local_exec(): try: localExec = request.get_json() run_local(localExec['step'], localExec['end'], localExec['batch_no'], localExec['transfer_type'], localExec['transfer_file_addr'], localExec['field_name'], localExec['field_code'], localExec['save_db']) except Exception as e: print(e) executor.submit(local_exec) return {'status': 200, 'message': '正在执行'} class Config(object): JOBS = [ { 'id': 'job1', 'func': 'schedule_service:run_schedule', 'args': (0, 4), 'trigger': 'interval', 'seconds': 60 } ] SCHEDULER_EXECUTORS = {'default': ThreadPoolExecutor(6)} # 调度器开关开启 SCHEDULER_API_ENABLED = True # 设置容错时间为 2min # coalesce积攒得任务跑几次,在时间允许得范围内 True:默认最后一次,False:在时间允许范围内全部提交 # max_instances 同时允许并发的最大并发量 # misfire_grace_time 如果重启任务在这个时间范围内,就能继续重启 SCHEDULER_JOB_DEFAULTS = {'coalesce': True, 'max_instances': 2, 'misfire_grace_time': 60} # 配置时区 SCHEDULER_TIMEZONE = 'Asia/Shanghai' @app.teardown_appcontext def shutdown_scheduler(exception): if scheduler.running: scheduler.shutdown() if __name__ == '__main__': app.config.from_object(Config()) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() app.run(host='0.0.0.0', port=8088)