# -*- coding: utf-8 -*- # @Time : 2024/5/16 # @Author : 魏志亮 import datetime import logging import sys from os import * from conf.constants import Log from utils.conf.read_conf import read_conf, yaml_conf def set_trance_id(trace_id): """设置当前线程的链路ID""" environ['trace_id'] = trace_id class ContextFilter(logging.Filter): """一个自定义的日志过滤器,用于在日志记录中添加链路ID""" def filter(self, record): record.trace_id = '' if 'trace_id' in environ.keys(): record.trace_id = environ['trace_id'] return True # 初始化日志配置 def init_logger(): """初始化日志配置""" logger = logging.getLogger("etl_tools") logger.setLevel(logging.DEBUG) # 设置为DEBUG以捕获所有级别的日志 # 清除已有的处理器 if logger.handlers: logger.handlers.clear() formatter = logging.Formatter("%(asctime)s-%(levelname)s-%(trace_id)s: %(message)s") # 控制台处理器 stout_handle = logging.StreamHandler(sys.stdout) stout_handle.setFormatter(formatter) # 根据环境设置日志级别 env = environ.get('env', 'dev') stout_handle.setLevel(logging.INFO) stout_handle.addFilter(ContextFilter()) logger.addHandler(stout_handle) # 文件处理器 try: config_path = environ.get('ETL_CONF') if config_path: config = yaml_conf(config_path) log_path_dir = read_conf(config, 'log_path_dir', Log.DEFAULT_LOG_PATH) else: log_path_dir = Log.DEFAULT_LOG_PATH log_path = log_path_dir + sep + Log.LOG_FILE_PREFIX + (environ['env'] if 'env' in environ else 'dev') file_path = path.join(log_path) if not path.exists(file_path): makedirs(file_path, exist_ok=True) # 普通日志文件(INFO及以上) file_name = file_path + sep + str(datetime.date.today()) + '.log' file_handler = logging.FileHandler(file_name, encoding='utf-8') file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) file_handler.addFilter(ContextFilter()) logger.addHandler(file_handler) # 错误日志文件(ERROR及以上) error_file_name = file_path + sep + str(datetime.date.today()) + '.error.log' error_file_handler = logging.FileHandler(error_file_name, encoding='utf-8') error_file_handler.setFormatter(formatter) error_file_handler.setLevel(logging.ERROR) error_file_handler.addFilter(ContextFilter()) logger.addHandler(error_file_handler) except Exception as e: # 如果日志文件创建失败,只使用控制台日志 pass return logger # 初始化日志记录器 logger = init_logger() def trans_print(*args, level: str = 'info'): """ 打印日志 Args: *args: 日志内容 level: 日志级别,可选值: 'debug', 'info', 'warning', 'error' """ message = " ".join([str(a) for a in args]) if level == 'debug': logger.debug(message) elif level == 'info': logger.info(message) elif level == 'warning': logger.warning(message) elif level == 'error': logger.error(message) else: logger.info(message) def debug(*args): """打印调试日志""" trans_print(*args, level='debug') def info(*args): """打印信息日志""" trans_print(*args, level='info') def warning(*args): """打印警告日志""" trans_print(*args, level='warning') def error(*args): """打印错误日志""" trans_print(*args, level='error')