You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/logger.py

88 lines
2.8 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日志管理模块
"""
import logging
import sys
from pathlib import Path
from typing import Optional
from config import config
class LoggerManager:
"""日志管理器"""
_loggers = {}
@classmethod
def get_logger(cls, name: str, log_file: Optional[str] = None) -> logging.Logger:
"""获取日志记录器"""
if name in cls._loggers:
return cls._loggers[name]
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, config.log_level.upper()))
# 避免重复添加处理器
if logger.handlers:
return logger
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(getattr(logging, config.log_level.upper()))
console_formatter = logging.Formatter(config.log_format)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 文件处理器
if log_file:
log_path = Path(config.data_dir) / log_file
file_handler = logging.FileHandler(log_path, encoding='utf-8')
file_handler.setLevel(getattr(logging, config.log_level.upper()))
file_formatter = logging.Formatter(config.log_format)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# WebSocket 实时日志处理器
logger.addHandler(WebSocketLogHandler())
cls._loggers[name] = logger
return logger
@classmethod
def setup_root_logger(cls):
"""设置根日志记录器"""
logging.basicConfig(
level=getattr(logging, config.log_level.upper()),
format=config.log_format,
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(Path(config.data_dir) / "app.log", encoding='utf-8'),
WebSocketLogHandler(),
]
)
# 便捷函数
def get_logger(name: str, log_file: Optional[str] = None) -> logging.Logger:
"""获取日志记录器的便捷函数"""
return LoggerManager.get_logger(name, log_file)
class WebSocketLogHandler(logging.Handler):
"""将日志通过实时日志器广播到 WebSocket 客户端"""
def emit(self, record: logging.LogRecord) -> None:
try:
message = self.format(record)
level = record.levelname
source = record.name
# 走同步接口,内部会尝试调度到事件循环
# 延迟导入,避免循环依赖
from realtime_logger import realtime_logger
realtime_logger.broadcast_log_sync(message, level, source)
except Exception:
# 保证日志不因 WebSocket 发送失败而中断
pass