#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 实时日志输出模块 """ import asyncio import json import time import threading from typing import List, Dict, Any, Optional from pathlib import Path import logging logger = logging.getLogger("realtime_logger") class RealtimeLogger: """实时日志记录器""" def __init__(self): self.connections: List[Any] = [] self.log_buffer: List[Dict[str, Any]] = [] self.max_buffer_size = 1000 self._lock = threading.Lock() self._loop: Optional[asyncio.AbstractEventLoop] = None def set_loop(self, loop: asyncio.AbstractEventLoop) -> None: """注册主事件循环,便于跨线程安全调度发送任务""" self._loop = loop def add_connection(self, websocket): """添加WebSocket连接""" with self._lock: self.connections.append(websocket) logger.info(f"新增WebSocket连接,当前连接数: {len(self.connections)}") def remove_connection(self, websocket): """移除WebSocket连接""" with self._lock: if websocket in self.connections: self.connections.remove(websocket) logger.info(f"移除WebSocket连接,当前连接数: {len(self.connections)}") async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"): """广播日志消息到所有连接的客户端""" log_entry = { "timestamp": time.time(), "time": time.strftime("%H:%M:%S"), "level": level, "source": source, "message": message } # 添加到缓冲区 with self._lock: self.log_buffer.append(log_entry) if len(self.log_buffer) > self.max_buffer_size: self.log_buffer = self.log_buffer[-self.max_buffer_size:] # 广播到所有连接 if self.connections: message_data = json.dumps(log_entry, ensure_ascii=False) disconnected = [] for websocket in self.connections.copy(): # 使用副本避免并发修改 try: await websocket.send_text(message_data) except Exception as e: logger.warning(f"发送消息失败: {e}") disconnected.append(websocket) # 清理断开的连接 for ws in disconnected: self.remove_connection(ws) def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"): """同步广播日志消息(用于非异步环境)""" log_entry = { "timestamp": time.time(), "time": time.strftime("%H:%M:%S"), "level": level, "source": source, "message": message } # 添加到缓冲区 with self._lock: self.log_buffer.append(log_entry) if len(self.log_buffer) > self.max_buffer_size: self.log_buffer = self.log_buffer[-self.max_buffer_size:] # 若已注册事件循环,尝试在线程安全地调度异步广播 if self._loop is not None: try: asyncio.run_coroutine_threadsafe( self.broadcast_log(message=message, level=level, source=source), self._loop, ) except Exception: # 忽略发送失败,缓冲区仍可用于新连接回放 pass async def get_recent_logs(self, count: int = 50) -> List[Dict[str, Any]]: """获取最近的日志""" with self._lock: return self.log_buffer[-count:] if self.log_buffer else [] def clear_buffer(self): """清空日志缓冲区""" with self._lock: self.log_buffer.clear() logger.info("日志缓冲区已清空") # 全局实时日志记录器 realtime_logger = RealtimeLogger()