# -*- coding: utf-8 -*- # @Time : 2024/6/11 # @Author : 魏志亮 import os import sys from utils.conf.read_conf import yaml_conf, read_conf def get_exec_data(run_count=1): now_run_count = get_now_running_count() data = None if now_run_count >= run_count: info(f"当前有{now_run_count}个任务在执行") else: data = get_batch_exec_data() return data def run(save_db=True, run_count=1, yaml_config=None, step=0, end=999): update_timeout_trans_data() data = get_exec_data(run_count) if data is None: info("没有需要执行的任务") return exec_process = None if data["transfer_type"] in ["second", "minute"]: exec_process = MinSecTrans( data=data, save_db=save_db, yaml_config=yaml_config, step=step, end=end ) if data["transfer_type"] in ["fault", "warn"]: exec_process = FaultWarnTrans( data=data, save_db=save_db, yaml_config=yaml_config ) if data["transfer_type"] == "wave": exec_process = WaveTrans(data["id"], data["wind_farm_code"], data["read_dir"]) if data["transfer_type"] == "laser": exec_process = LaserTrans(data["id"], data["wind_farm_code"], data["read_dir"]) if exec_process is None: raise Exception("没有相应的执行器") exec_process.run() if __name__ == "__main__": env = "dev" if len(sys.argv) >= 2: env = sys.argv[1] if env.endswith(".yaml"): conf_path = env else: conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml") os.environ["ETL_CONF"] = conf_path yaml_config = yaml_conf(conf_path) os.environ["env"] = env run_count = int(read_conf(yaml_config, "run_batch_count", 1)) from utils.log.trans_log import info from service.trans_conf_service import ( update_timeout_trans_data, get_now_running_count, get_batch_exec_data, ) from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans from etl.wind_power.min_sec.MinSecTrans import MinSecTrans from etl.wind_power.laser.LaserTrans import LaserTrans from etl.wind_power.wave.WaveTrans import WaveTrans info("所有请求参数:", sys.argv, "env:", env, "最大可执行个数:", run_count) info("配置文件路径:", os.environ.get("ETL_CONF")) run(run_count=run_count, yaml_config=yaml_config, step=0)