# -*- coding: utf-8 -*- # @Time : 2024/6/11 # @Author : 魏志亮 import datetime import os import sys import traceback import pandas as pd from utils.conf.read_conf import yaml_conf, read_conf def get_exec_data(batch_no=None, read_type=None, run_count=1): if batch_no and read_type: data = get_data_by_batch_no_and_type(batch_no, read_type) if data is None: raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}") else: data = get_batch_exec_data(run_count) if data is None: trans_print("当前有任务在执行") sys.exit(0) elif len(data.keys()) == 0: trans_print("当前无任务") sys.exit(0) return data def run(data: dict = dict(), save_db=False): exec_process = None if data['transfer_type'] in ['second', 'minute']: exec_process = MinSecTrans(data=data, save_db=save_db) if data['transfer_type'] in ['fault', 'warn']: exec_process = FaultWarnTrans(data=data, save_db=save_db) if exec_process is None: raise Exception("No exec process") exec_process.run() if __name__ == '__main__': env = 'dev' if len(sys.argv) >= 2: env = sys.argv[1] conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml") os.environ['ETL_CONF'] = conf_path yaml_config = yaml_conf(conf_path) os.environ['env'] = env run_count = int(read_conf(yaml_config, "run_batch_count", 1)) from utils.log.trans_log import trans_print from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans from etl.wind_power.min_sec.MinSecTrans import MinSecTrans from utils.file.trans_methods import read_file_to_df begin = datetime.datetime.now() df = read_file_to_df("tmp_file/rebuild_data.csv") results = list() data = dict() for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \ in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'], df['field_name']): batch_begin = datetime.datetime.now() transfer_addr = transfer_addr.replace(r"/data/download/collection_data", r"/data/download/datang_shangxian") trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name) trans_print("批次路径:", transfer_addr) data['batch_code'] = batch_code data['batch_name'] = batch_name data['transfer_type'] = transfer_type data['transfer_addr'] = transfer_addr data['field_code'] = field_code data['field_name'] = field_name try: run(data=data, save_db=True) results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success')) except Exception as e: results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error')) trans_print(traceback.format_exc()) finally: trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin) for data in results: trans_print(data) trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)