trans_log.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/16
  3. # @Author : 魏志亮
  4. import datetime
  5. import logging
  6. import sys
  7. from os import *
  8. from conf.constants import Log
  9. from utils.conf.read_conf import read_conf, yaml_conf
  10. def set_trance_id(trace_id):
  11. """设置当前线程的链路ID"""
  12. environ['trace_id'] = trace_id
  13. class ContextFilter(logging.Filter):
  14. """一个自定义的日志过滤器,用于在日志记录中添加链路ID"""
  15. def filter(self, record):
  16. record.trace_id = ''
  17. if 'trace_id' in environ.keys():
  18. record.trace_id = environ['trace_id']
  19. return True
  20. # 初始化日志配置
  21. def init_logger():
  22. """初始化日志配置"""
  23. logger = logging.getLogger("etl_tools")
  24. logger.setLevel(logging.DEBUG) # 设置为DEBUG以捕获所有级别的日志
  25. # 清除已有的处理器
  26. if logger.handlers:
  27. logger.handlers.clear()
  28. formatter = logging.Formatter("%(asctime)s-%(levelname)s-%(trace_id)s: %(message)s")
  29. # 控制台处理器
  30. stout_handle = logging.StreamHandler(sys.stdout)
  31. stout_handle.setFormatter(formatter)
  32. # 根据环境设置日志级别
  33. env = environ.get('env', 'dev')
  34. stout_handle.setLevel(logging.INFO)
  35. stout_handle.addFilter(ContextFilter())
  36. logger.addHandler(stout_handle)
  37. # 文件处理器
  38. try:
  39. config_path = environ.get('ETL_CONF')
  40. if config_path:
  41. config = yaml_conf(config_path)
  42. log_path_dir = read_conf(config, 'log_path_dir', Log.DEFAULT_LOG_PATH)
  43. else:
  44. log_path_dir = Log.DEFAULT_LOG_PATH
  45. log_path = log_path_dir + sep + Log.LOG_FILE_PREFIX + (environ['env'] if 'env' in environ else 'dev')
  46. file_path = path.join(log_path)
  47. if not path.exists(file_path):
  48. makedirs(file_path, exist_ok=True)
  49. # 普通日志文件(INFO及以上)
  50. file_name = file_path + sep + str(datetime.date.today()) + '.log'
  51. file_handler = logging.FileHandler(file_name, encoding='utf-8')
  52. file_handler.setFormatter(formatter)
  53. file_handler.setLevel(logging.INFO)
  54. file_handler.addFilter(ContextFilter())
  55. logger.addHandler(file_handler)
  56. # 错误日志文件(ERROR及以上)
  57. error_file_name = file_path + sep + str(datetime.date.today()) + '.error.log'
  58. error_file_handler = logging.FileHandler(error_file_name, encoding='utf-8')
  59. error_file_handler.setFormatter(formatter)
  60. error_file_handler.setLevel(logging.ERROR)
  61. error_file_handler.addFilter(ContextFilter())
  62. logger.addHandler(error_file_handler)
  63. except Exception as e:
  64. # 如果日志文件创建失败,只使用控制台日志
  65. pass
  66. return logger
  67. # 初始化日志记录器
  68. logger = init_logger()
  69. def trans_print(*args, level: str = 'info'):
  70. """
  71. 打印日志
  72. Args:
  73. *args: 日志内容
  74. level: 日志级别,可选值: 'debug', 'info', 'warning', 'error'
  75. """
  76. message = " ".join([str(a) for a in args])
  77. if level == 'debug':
  78. logger.debug(message)
  79. elif level == 'info':
  80. logger.info(message)
  81. elif level == 'warning':
  82. logger.warning(message)
  83. elif level == 'error':
  84. logger.error(message)
  85. else:
  86. logger.info(message)
  87. def debug(*args):
  88. """打印调试日志"""
  89. trans_print(*args, level='debug')
  90. def info(*args):
  91. """打印信息日志"""
  92. trans_print(*args, level='info')
  93. def warning(*args):
  94. """打印警告日志"""
  95. trans_print(*args, level='warning')
  96. def error(*args):
  97. """打印错误日志"""
  98. trans_print(*args, level='error')