You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
4.0 KiB
117 lines
4.0 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
实时日志输出模块
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import threading
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger("realtime_logger")
|
|
|
|
|
|
class RealtimeLogger:
|
|
"""实时日志记录器"""
|
|
|
|
def __init__(self):
|
|
self.connections: List[Any] = []
|
|
self.log_buffer: List[Dict[str, Any]] = []
|
|
self.max_buffer_size = 1000
|
|
self._lock = threading.Lock()
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
def set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
|
|
"""注册主事件循环,便于跨线程安全调度发送任务"""
|
|
self._loop = loop
|
|
|
|
def add_connection(self, websocket):
|
|
"""添加WebSocket连接"""
|
|
with self._lock:
|
|
self.connections.append(websocket)
|
|
logger.info(f"新增WebSocket连接,当前连接数: {len(self.connections)}")
|
|
|
|
def remove_connection(self, websocket):
|
|
"""移除WebSocket连接"""
|
|
with self._lock:
|
|
if websocket in self.connections:
|
|
self.connections.remove(websocket)
|
|
logger.info(f"移除WebSocket连接,当前连接数: {len(self.connections)}")
|
|
|
|
async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"):
|
|
"""广播日志消息到所有连接的客户端"""
|
|
log_entry = {
|
|
"timestamp": time.time(),
|
|
"time": time.strftime("%H:%M:%S"),
|
|
"level": level,
|
|
"source": source,
|
|
"message": message
|
|
}
|
|
|
|
# 添加到缓冲区
|
|
with self._lock:
|
|
self.log_buffer.append(log_entry)
|
|
if len(self.log_buffer) > self.max_buffer_size:
|
|
self.log_buffer = self.log_buffer[-self.max_buffer_size:]
|
|
|
|
# 广播到所有连接
|
|
if self.connections:
|
|
message_data = json.dumps(log_entry, ensure_ascii=False)
|
|
disconnected = []
|
|
|
|
for websocket in self.connections.copy(): # 使用副本避免并发修改
|
|
try:
|
|
await websocket.send_text(message_data)
|
|
except Exception as e:
|
|
logger.warning(f"发送消息失败: {e}")
|
|
disconnected.append(websocket)
|
|
|
|
# 清理断开的连接
|
|
for ws in disconnected:
|
|
self.remove_connection(ws)
|
|
|
|
def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"):
|
|
"""同步广播日志消息(用于非异步环境)"""
|
|
log_entry = {
|
|
"timestamp": time.time(),
|
|
"time": time.strftime("%H:%M:%S"),
|
|
"level": level,
|
|
"source": source,
|
|
"message": message
|
|
}
|
|
|
|
# 添加到缓冲区
|
|
with self._lock:
|
|
self.log_buffer.append(log_entry)
|
|
if len(self.log_buffer) > self.max_buffer_size:
|
|
self.log_buffer = self.log_buffer[-self.max_buffer_size:]
|
|
|
|
# 若已注册事件循环,尝试在线程安全地调度异步广播
|
|
if self._loop is not None:
|
|
try:
|
|
asyncio.run_coroutine_threadsafe(
|
|
self.broadcast_log(message=message, level=level, source=source),
|
|
self._loop,
|
|
)
|
|
except Exception:
|
|
# 忽略发送失败,缓冲区仍可用于新连接回放
|
|
pass
|
|
|
|
async def get_recent_logs(self, count: int = 50) -> List[Dict[str, Any]]:
|
|
"""获取最近的日志"""
|
|
with self._lock:
|
|
return self.log_buffer[-count:] if self.log_buffer else []
|
|
|
|
def clear_buffer(self):
|
|
"""清空日志缓冲区"""
|
|
with self._lock:
|
|
self.log_buffer.clear()
|
|
logger.info("日志缓冲区已清空")
|
|
|
|
|
|
# 全局实时日志记录器
|
|
realtime_logger = RealtimeLogger()
|
|
|