| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/16
- # @Author : 魏志亮
- import datetime
- import logging
- import sys
- from os import *
- from conf.constants import Log
- from utils.conf.read_conf import read_conf, yaml_conf
- def set_trance_id(trace_id):
- """设置当前线程的链路ID"""
- environ['trace_id'] = trace_id
- class ContextFilter(logging.Filter):
- """一个自定义的日志过滤器,用于在日志记录中添加链路ID"""
- def filter(self, record):
- record.trace_id = ''
- if 'trace_id' in environ.keys():
- record.trace_id = environ['trace_id']
- return True
- # 初始化日志配置
- def init_logger():
- """初始化日志配置"""
- logger = logging.getLogger("etl_tools")
- logger.setLevel(logging.DEBUG) # 设置为DEBUG以捕获所有级别的日志
- # 清除已有的处理器
- if logger.handlers:
- logger.handlers.clear()
- formatter = logging.Formatter("%(asctime)s-%(levelname)s-%(trace_id)s: %(message)s")
- # 控制台处理器
- stout_handle = logging.StreamHandler(sys.stdout)
- stout_handle.setFormatter(formatter)
- # 根据环境设置日志级别
- env = environ.get('env', 'dev')
- stout_handle.setLevel(logging.INFO)
- stout_handle.addFilter(ContextFilter())
- logger.addHandler(stout_handle)
- # 文件处理器
- try:
- config_path = environ.get('ETL_CONF')
- if config_path:
- config = yaml_conf(config_path)
- log_path_dir = read_conf(config, 'log_path_dir', Log.DEFAULT_LOG_PATH)
- else:
- log_path_dir = Log.DEFAULT_LOG_PATH
- log_path = log_path_dir + sep + Log.LOG_FILE_PREFIX + (environ['env'] if 'env' in environ else 'dev')
- file_path = path.join(log_path)
- if not path.exists(file_path):
- makedirs(file_path, exist_ok=True)
- # 普通日志文件(INFO及以上)
- file_name = file_path + sep + str(datetime.date.today()) + '.log'
- file_handler = logging.FileHandler(file_name, encoding='utf-8')
- file_handler.setFormatter(formatter)
- file_handler.setLevel(logging.INFO)
- file_handler.addFilter(ContextFilter())
- logger.addHandler(file_handler)
- # 错误日志文件(ERROR及以上)
- error_file_name = file_path + sep + str(datetime.date.today()) + '.error.log'
- error_file_handler = logging.FileHandler(error_file_name, encoding='utf-8')
- error_file_handler.setFormatter(formatter)
- error_file_handler.setLevel(logging.ERROR)
- error_file_handler.addFilter(ContextFilter())
- logger.addHandler(error_file_handler)
- except Exception as e:
- # 如果日志文件创建失败,只使用控制台日志
- pass
- return logger
- # 初始化日志记录器
- logger = init_logger()
- def trans_print(*args, level: str = 'info'):
- """
- 打印日志
-
- Args:
- *args: 日志内容
- level: 日志级别,可选值: 'debug', 'info', 'warning', 'error'
- """
- message = " ".join([str(a) for a in args])
- if level == 'debug':
- logger.debug(message)
- elif level == 'info':
- logger.info(message)
- elif level == 'warning':
- logger.warning(message)
- elif level == 'error':
- logger.error(message)
- else:
- logger.info(message)
- def debug(*args):
- """打印调试日志"""
- trans_print(*args, level='debug')
- def info(*args):
- """打印信息日志"""
- trans_print(*args, level='info')
- def warning(*args):
- """打印警告日志"""
- trans_print(*args, level='warning')
- def error(*args):
- """打印错误日志"""
- trans_print(*args, level='error')
|