You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/realtime_logger.py

117 lines
4.0 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时日志输出模块
"""
import asyncio
import json
import time
import threading
from typing import List, Dict, Any, Optional
from pathlib import Path
import logging
logger = logging.getLogger("realtime_logger")
class RealtimeLogger:
"""实时日志记录器"""
def __init__(self):
self.connections: List[Any] = []
self.log_buffer: List[Dict[str, Any]] = []
self.max_buffer_size = 1000
self._lock = threading.Lock()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""注册主事件循环,便于跨线程安全调度发送任务"""
self._loop = loop
def add_connection(self, websocket):
"""添加WebSocket连接"""
with self._lock:
self.connections.append(websocket)
logger.info(f"新增WebSocket连接,当前连接数: {len(self.connections)}")
def remove_connection(self, websocket):
"""移除WebSocket连接"""
with self._lock:
if websocket in self.connections:
self.connections.remove(websocket)
logger.info(f"移除WebSocket连接,当前连接数: {len(self.connections)}")
async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"):
"""广播日志消息到所有连接的客户端"""
log_entry = {
"timestamp": time.time(),
"time": time.strftime("%H:%M:%S"),
"level": level,
"source": source,
"message": message
}
# 添加到缓冲区
with self._lock:
self.log_buffer.append(log_entry)
if len(self.log_buffer) > self.max_buffer_size:
self.log_buffer = self.log_buffer[-self.max_buffer_size:]
# 广播到所有连接
if self.connections:
message_data = json.dumps(log_entry, ensure_ascii=False)
disconnected = []
for websocket in self.connections.copy(): # 使用副本避免并发修改
try:
await websocket.send_text(message_data)
except Exception as e:
logger.warning(f"发送消息失败: {e}")
disconnected.append(websocket)
# 清理断开的连接
for ws in disconnected:
self.remove_connection(ws)
def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"):
"""同步广播日志消息(用于非异步环境)"""
log_entry = {
"timestamp": time.time(),
"time": time.strftime("%H:%M:%S"),
"level": level,
"source": source,
"message": message
}
# 添加到缓冲区
with self._lock:
self.log_buffer.append(log_entry)
if len(self.log_buffer) > self.max_buffer_size:
self.log_buffer = self.log_buffer[-self.max_buffer_size:]
# 若已注册事件循环,尝试在线程安全地调度异步广播
if self._loop is not None:
try:
asyncio.run_coroutine_threadsafe(
self.broadcast_log(message=message, level=level, source=source),
self._loop,
)
except Exception:
# 忽略发送失败,缓冲区仍可用于新连接回放
pass
async def get_recent_logs(self, count: int = 50) -> List[Dict[str, Any]]:
"""获取最近的日志"""
with self._lock:
return self.log_buffer[-count:] if self.log_buffer else []
def clear_buffer(self):
"""清空日志缓冲区"""
with self._lock:
self.log_buffer.clear()
logger.info("日志缓冲区已清空")
# 全局实时日志记录器
realtime_logger = RealtimeLogger()