diff --git a/src/main/java/com/claudecode/config/AppConfig.java b/src/main/java/com/claudecode/config/AppConfig.java index ef57851..60013d8 100644 --- a/src/main/java/com/claudecode/config/AppConfig.java +++ b/src/main/java/com/claudecode/config/AppConfig.java @@ -117,7 +117,9 @@ public class AppConfig { new EnterPlanModeTool(), new ExitPlanModeTool(), new SkillTool(), - new SendMessageTool() + new SendMessageTool(), + new ListMcpResourcesTool(), + new ReadMcpResourceTool() ); // P2: 注册 MCP 工具桥接(将远程 MCP 工具映射为本地工具) diff --git a/src/main/java/com/claudecode/mcp/HttpSseTransport.java b/src/main/java/com/claudecode/mcp/HttpSseTransport.java new file mode 100644 index 0000000..9250101 --- /dev/null +++ b/src/main/java/com/claudecode/mcp/HttpSseTransport.java @@ -0,0 +1,337 @@ +package com.claudecode.mcp; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * HTTP + SSE 传输层 —— 对应 claude-code 中的 HTTP 传输实现。 + *

+ * 用于连接基于 HTTP 的 MCP 服务器,使用 SSE (Server-Sent Events) 接收通知, + * 使用 HTTP POST 发送请求。 + *

+ * MCP HTTP 传输协议流程: + *

    + *
  1. 建立 SSE 连接获取 endpoint URL
  2. + *
  3. 通过 POST 请求发送 JSON-RPC 消息到 endpoint
  4. + *
  5. 通过 SSE 流接收响应和通知
  6. + *
+ * + * @see McpTransport + */ +public class HttpSseTransport implements McpTransport { + + private static final Logger log = LoggerFactory.getLogger(HttpSseTransport.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private final String baseUrl; + private final HttpClient httpClient; + private final Map headers; + private final Duration timeout; + + /** Endpoint URL received from SSE connection */ + private volatile String messageEndpoint; + + /** Pending response futures: request id -> CompletableFuture */ + private final ConcurrentHashMap> pendingRequests = + new ConcurrentHashMap<>(); + + /** SSE connection state */ + private final AtomicBoolean connected = new AtomicBoolean(false); + private volatile Future sseListenerFuture; + private final ExecutorService sseExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "mcp-sse-listener"); + t.setDaemon(true); + return t; + }); + + /** + * 创建 HTTP+SSE 传输层。 + * + * @param baseUrl MCP 服务器的基础 URL (e.g., "http://localhost:3000") + */ + public HttpSseTransport(String baseUrl) { + this(baseUrl, Map.of(), DEFAULT_TIMEOUT); + } + + /** + * 创建 HTTP+SSE 传输层(自定义头和超时)。 + * + * @param baseUrl MCP 服务器的基础 URL + * @param headers 自定义 HTTP 头(如认证 token) + * @param timeout 请求超时时间 + */ + public HttpSseTransport(String baseUrl, Map headers, Duration timeout) { + this.baseUrl = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + this.headers = headers != null ? headers : Map.of(); + this.timeout = timeout != null ? timeout : DEFAULT_TIMEOUT; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(this.timeout) + .build(); + } + + /** + * 连接到 SSE 端点并开始监听。 + * 必须在发送请求前调用。 + */ + public void connect() throws McpException { + if (connected.get()) return; + + log.info("Connecting to MCP HTTP server at {}", baseUrl); + + // Start SSE listener + sseListenerFuture = sseExecutor.submit(() -> { + try { + listenSse(); + } catch (Exception e) { + if (connected.get()) { + log.warn("SSE listener error: {}", e.getMessage()); + } + } + }); + + // Wait for endpoint URL + int waitMs = 0; + while (messageEndpoint == null && waitMs < timeout.toMillis()) { + try { + Thread.sleep(100); + waitMs += 100; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new McpException("Interrupted while waiting for SSE endpoint"); + } + } + + if (messageEndpoint == null) { + throw new McpException("Timeout waiting for SSE endpoint from " + baseUrl); + } + + connected.set(true); + log.info("Connected to MCP HTTP server, endpoint: {}", messageEndpoint); + } + + /** + * SSE 监听循环 —— 连接到 /sse 端点并解析事件流。 + */ + private void listenSse() throws Exception { + String sseUrl = baseUrl + "/sse"; + log.debug("Starting SSE listener at {}", sseUrl); + + var requestBuilder = HttpRequest.newBuilder() + .uri(URI.create(sseUrl)) + .timeout(Duration.ofMinutes(30)) // Long timeout for SSE + .GET(); + + // Add custom headers + for (var entry : headers.entrySet()) { + requestBuilder.header(entry.getKey(), entry.getValue()); + } + + HttpRequest request = requestBuilder.build(); + HttpResponse response = httpClient.send( + request, HttpResponse.BodyHandlers.ofInputStream()); + + if (response.statusCode() != 200) { + throw new McpException("SSE connection failed with status " + response.statusCode()); + } + + try (var reader = new BufferedReader( + new InputStreamReader(response.body(), StandardCharsets.UTF_8))) { + + String eventType = null; + StringBuilder dataBuffer = new StringBuilder(); + + String line; + while ((line = reader.readLine()) != null && connected.get()) { + if (line.startsWith("event:")) { + eventType = line.substring(6).strip(); + } else if (line.startsWith("data:")) { + dataBuffer.append(line.substring(5).strip()); + } else if (line.isEmpty() && dataBuffer.length() > 0) { + // End of event + handleSseEvent(eventType, dataBuffer.toString()); + eventType = null; + dataBuffer.setLength(0); + } + } + } + } + + /** + * 处理 SSE 事件。 + */ + private void handleSseEvent(String eventType, String data) { + if ("endpoint".equals(eventType)) { + // Server sends the POST endpoint URL + if (data.startsWith("http://") || data.startsWith("https://")) { + messageEndpoint = data; + } else { + messageEndpoint = baseUrl + (data.startsWith("/") ? data : "/" + data); + } + log.debug("Received SSE endpoint: {}", messageEndpoint); + } else if ("message".equals(eventType) || eventType == null) { + // JSON-RPC response or notification + try { + JsonNode json = MAPPER.readTree(data); + if (json.has("id")) { + // It's a response to a pending request + String id = json.get("id").asText(); + CompletableFuture future = pendingRequests.remove(id); + if (future != null) { + future.complete(json); + } else { + log.debug("Received response for unknown request id: {}", id); + } + } else { + // It's a notification — log it + String method = json.has("method") ? json.get("method").asText() : "unknown"; + log.debug("Received SSE notification: {}", method); + } + } catch (Exception e) { + log.debug("Failed to parse SSE message data: {}", data, e); + } + } + } + + @Override + public JsonNode sendRequest(String jsonRpcRequest) throws McpException { + if (!connected.get() || messageEndpoint == null) { + connect(); + } + + try { + // Extract request ID for response matching + JsonNode requestNode = MAPPER.readTree(jsonRpcRequest); + String requestId = requestNode.has("id") ? requestNode.get("id").asText() : null; + + // Register pending response + CompletableFuture responseFuture = new CompletableFuture<>(); + if (requestId != null) { + pendingRequests.put(requestId, responseFuture); + } + + // Send HTTP POST + var httpRequest = HttpRequest.newBuilder() + .uri(URI.create(messageEndpoint)) + .timeout(timeout) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(jsonRpcRequest)) + .build(); + + // Add custom headers + // Note: HttpRequest is immutable, headers must be set at build time + // For simplicity, rebuild if we have custom headers + if (!headers.isEmpty()) { + var builder = HttpRequest.newBuilder() + .uri(URI.create(messageEndpoint)) + .timeout(timeout) + .header("Content-Type", "application/json"); + for (var entry : headers.entrySet()) { + builder.header(entry.getKey(), entry.getValue()); + } + httpRequest = builder.POST(HttpRequest.BodyPublishers.ofString(jsonRpcRequest)).build(); + } + + HttpResponse httpResponse = httpClient.send( + httpRequest, HttpResponse.BodyHandlers.ofString()); + + if (httpResponse.statusCode() >= 400) { + pendingRequests.remove(requestId); + throw new McpException("HTTP error " + httpResponse.statusCode() + + ": " + httpResponse.body()); + } + + // If the HTTP response body contains JSON-RPC response, use it directly + String body = httpResponse.body(); + if (body != null && !body.isBlank()) { + try { + JsonNode directResponse = MAPPER.readTree(body); + if (directResponse.has("result") || directResponse.has("error")) { + pendingRequests.remove(requestId); + return directResponse; + } + } catch (Exception ignored) { + // Not a JSON response, wait for SSE + } + } + + // Wait for response via SSE + if (requestId != null) { + try { + return responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + pendingRequests.remove(requestId); + throw new McpException("Timeout waiting for response to request " + requestId); + } + } + + // No ID means notification — return empty + return MAPPER.createObjectNode(); + + } catch (McpException e) { + throw e; + } catch (Exception e) { + throw new McpException("HTTP request failed: " + e.getMessage(), e); + } + } + + @Override + public void sendNotification(String jsonRpcNotification) throws McpException { + if (!connected.get() || messageEndpoint == null) { + connect(); + } + + try { + var httpRequest = HttpRequest.newBuilder() + .uri(URI.create(messageEndpoint)) + .timeout(timeout) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(jsonRpcNotification)) + .build(); + + HttpResponse response = httpClient.send( + httpRequest, HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() >= 400) { + throw new McpException("HTTP notification failed with status " + response.statusCode()); + } + } catch (McpException e) { + throw e; + } catch (Exception e) { + throw new McpException("Failed to send notification: " + e.getMessage(), e); + } + } + + @Override + public boolean isConnected() { + return connected.get() && messageEndpoint != null; + } + + @Override + public void close() throws Exception { + connected.set(false); + pendingRequests.values().forEach(f -> + f.completeExceptionally(new McpException("Transport closed"))); + pendingRequests.clear(); + + if (sseListenerFuture != null) { + sseListenerFuture.cancel(true); + } + sseExecutor.shutdownNow(); + log.info("HttpSseTransport closed"); + } +} diff --git a/src/main/java/com/claudecode/mcp/McpManager.java b/src/main/java/com/claudecode/mcp/McpManager.java index f327d92..b39f818 100644 --- a/src/main/java/com/claudecode/mcp/McpManager.java +++ b/src/main/java/com/claudecode/mcp/McpManager.java @@ -126,11 +126,24 @@ public class McpManager implements AutoCloseable { Iterator> envFields = serverDef.get("env").fields(); while (envFields.hasNext()) { Map.Entry envEntry = envFields.next(); - env.put(envEntry.getKey(), envEntry.getValue().asText()); + env.put(envEntry.getKey(), expandEnvVars(envEntry.getValue().asText())); } } - connect(name, command, args, env); + // Expand env vars in command and args + command = expandEnvVars(command); + List expandedArgs = new ArrayList<>(); + for (String arg : args) { + expandedArgs.add(expandEnvVars(arg)); + } + + // Check if this is an HTTP/SSE server (url field present) + if (serverDef.has("url")) { + String url = expandEnvVars(serverDef.get("url").asText()); + connectHttp(name, url, env); + } else { + connect(name, command, expandedArgs, env); + } } catch (Exception e) { log.error("Failed to connect MCP server '{}' from config: {}", name, e.getMessage()); } @@ -415,4 +428,101 @@ public class McpManager implements AutoCloseable { log.info("All MCP connections closed"); } + + /** + * 连接 HTTP+SSE MCP 服务器。 + * + * @param name 服务器名称 + * @param url 服务器 URL + * @param env 环境变量(用于请求头等) + * @return 已初始化的 MCP 客户端 + * @throws McpException 连接或初始化失败 + */ + public McpClient connectHttp(String name, String url, Map env) throws McpException { + if (clients.containsKey(name)) { + log.info("MCP server '{}' already exists, disconnecting old connection", name); + try { + disconnect(name); + } catch (Exception e) { + log.warn("Exception disconnecting old MCP connection '{}': {}", name, e.getMessage()); + } + } + + log.info("Connecting MCP HTTP server '{}': {}", name, url); + + // Extract auth headers from env + Map headers = new HashMap<>(); + if (env != null) { + String authToken = env.get("AUTHORIZATION"); + if (authToken != null) { + headers.put("Authorization", authToken); + } + String apiKey = env.get("API_KEY"); + if (apiKey != null) { + headers.put("X-API-Key", apiKey); + } + } + + HttpSseTransport transport = new HttpSseTransport(url, headers, null); + McpClient client; + try { + transport.connect(); + client = new McpClient(name, transport); + client.initialize(); + } catch (Exception e) { + try { + transport.close(); + } catch (Exception suppressed) { + e.addSuppressed(suppressed); + } + throw (e instanceof McpException mcp) ? mcp + : new McpException("Failed to connect MCP HTTP server '" + name + "': " + e.getMessage(), e); + } + + clients.put(name, client); + for (McpClient.McpTool tool : client.getTools()) { + toolToServer.put(tool.name(), name); + } + + log.info("MCP HTTP server '{}' connected successfully", name); + return client; + } + + /** + * 展开字符串中的环境变量引用。 + * 支持 ${VAR_NAME} 语法,未定义的变量保留原样。 + * + * @param value 包含可能的环境变量引用的字符串 + * @return 展开后的字符串 + */ + static String expandEnvVars(String value) { + if (value == null || !value.contains("${")) { + return value; + } + + StringBuilder result = new StringBuilder(); + int i = 0; + while (i < value.length()) { + if (i < value.length() - 2 && value.charAt(i) == '$' && value.charAt(i + 1) == '{') { + int end = value.indexOf('}', i + 2); + if (end != -1) { + String varName = value.substring(i + 2, end); + String envVal = System.getenv(varName); + if (envVal != null) { + result.append(envVal); + } else { + result.append("${").append(varName).append("}"); + } + i = end + 1; + } else { + result.append(value.charAt(i)); + i++; + } + } else { + result.append(value.charAt(i)); + i++; + } + } + return result.toString(); + } } diff --git a/src/main/java/com/claudecode/tool/impl/ListMcpResourcesTool.java b/src/main/java/com/claudecode/tool/impl/ListMcpResourcesTool.java new file mode 100644 index 0000000..6368284 --- /dev/null +++ b/src/main/java/com/claudecode/tool/impl/ListMcpResourcesTool.java @@ -0,0 +1,109 @@ +package com.claudecode.tool.impl; + +import com.claudecode.mcp.McpClient; +import com.claudecode.mcp.McpManager; +import com.claudecode.tool.Tool; +import com.claudecode.tool.ToolContext; + +import java.util.Map; + +/** + * ListMcpResources 工具 —— 列出 MCP 服务器提供的资源。 + *

+ * 对应 claude-code 中浏览 MCP 资源的功能。 + * 显示所有已连接 MCP 服务器的资源列表,包括 URI、名称、描述和 MIME 类型。 + */ +public class ListMcpResourcesTool implements Tool { + + @Override + public String name() { + return "ListMcpResources"; + } + + @Override + public String description() { + return """ + List resources available from connected MCP (Model Context Protocol) servers. + Shows all resources with their URIs, names, descriptions, and MIME types. + Use this to discover what data sources are available before reading them. + Optionally filter by server name."""; + } + + @Override + public String inputSchema() { + return """ + { + "type": "object", + "properties": { + "server": { + "type": "string", + "description": "Optional: filter resources by MCP server name" + } + } + }"""; + } + + @Override + public String execute(Map input, ToolContext context) { + McpManager mcpManager = context.getOrDefault("MCP_MANAGER", null); + if (mcpManager == null) { + return "No MCP servers configured."; + } + + String serverFilter = (String) input.getOrDefault("server", null); + var clients = mcpManager.getClients(); + + if (clients.isEmpty()) { + return "No MCP servers connected."; + } + + StringBuilder sb = new StringBuilder(); + int totalResources = 0; + + for (var entry : clients.entrySet()) { + String serverName = entry.getKey(); + McpClient client = entry.getValue(); + + if (serverFilter != null && !serverFilter.isBlank() + && !serverName.equalsIgnoreCase(serverFilter)) { + continue; + } + + if (!client.isInitialized() || !client.isConnected()) { + sb.append("⚠ Server '").append(serverName).append("': not connected\n"); + continue; + } + + var resources = client.getResources(); + if (resources.isEmpty()) { + sb.append("Server '").append(serverName).append("': no resources\n"); + continue; + } + + sb.append("## ").append(serverName).append(" (").append(resources.size()).append(" resources)\n\n"); + + for (var resource : resources) { + sb.append("- **").append(resource.name()).append("**\n"); + sb.append(" URI: `").append(resource.uri()).append("`\n"); + if (!resource.description().isBlank()) { + sb.append(" ").append(resource.description()).append("\n"); + } + sb.append(" Type: ").append(resource.mimeType()).append("\n\n"); + totalResources++; + } + } + + if (totalResources == 0) { + return serverFilter != null + ? "No resources found for server '" + serverFilter + "'." + : "No MCP resources available from any connected server."; + } + + return sb.toString().stripTrailing(); + } + + @Override + public String activityDescription(Map input) { + return "📋 Listing MCP resources"; + } +} diff --git a/src/main/java/com/claudecode/tool/impl/ReadMcpResourceTool.java b/src/main/java/com/claudecode/tool/impl/ReadMcpResourceTool.java new file mode 100644 index 0000000..377e531 --- /dev/null +++ b/src/main/java/com/claudecode/tool/impl/ReadMcpResourceTool.java @@ -0,0 +1,130 @@ +package com.claudecode.tool.impl; + +import com.claudecode.mcp.McpClient; +import com.claudecode.mcp.McpManager; +import com.claudecode.tool.Tool; +import com.claudecode.tool.ToolContext; + +import java.util.Map; + +/** + * ReadMcpResource 工具 —— 读取 MCP 服务器的指定资源。 + *

+ * 对应 claude-code 中读取 MCP 资源的功能。 + * 通过 URI 从 MCP 服务器读取资源内容。 + */ +public class ReadMcpResourceTool implements Tool { + + @Override + public String name() { + return "ReadMcpResource"; + } + + @Override + public String description() { + return """ + Read a specific resource from a connected MCP (Model Context Protocol) server. + Provide the resource URI (obtained from ListMcpResources) to fetch its content. + The server name is optional — if omitted, all servers are searched for the URI."""; + } + + @Override + public String inputSchema() { + return """ + { + "type": "object", + "properties": { + "uri": { + "type": "string", + "description": "The resource URI to read (e.g., 'file:///path' or 'custom://resource')" + }, + "server": { + "type": "string", + "description": "Optional: the MCP server name that provides this resource" + } + }, + "required": ["uri"] + }"""; + } + + @Override + public String execute(Map input, ToolContext context) { + String uri = (String) input.get("uri"); + String serverFilter = (String) input.getOrDefault("server", null); + + if (uri == null || uri.isBlank()) { + return "Error: 'uri' is required. Use ListMcpResources to discover available resources."; + } + + McpManager mcpManager = context.getOrDefault("MCP_MANAGER", null); + if (mcpManager == null) { + return "Error: No MCP servers configured."; + } + + var clients = mcpManager.getClients(); + if (clients.isEmpty()) { + return "Error: No MCP servers connected."; + } + + // If server specified, try only that server + if (serverFilter != null && !serverFilter.isBlank()) { + McpClient client = clients.get(serverFilter); + if (client == null) { + return "Error: MCP server '" + serverFilter + "' not found. " + + "Available servers: " + String.join(", ", clients.keySet()); + } + return readFromClient(client, serverFilter, uri); + } + + // Try all connected servers + for (var entry : clients.entrySet()) { + McpClient client = entry.getValue(); + if (!client.isInitialized() || !client.isConnected()) continue; + + // Check if this server has the resource + boolean hasResource = client.getResources().stream() + .anyMatch(r -> r.uri().equals(uri)); + if (hasResource) { + return readFromClient(client, entry.getKey(), uri); + } + } + + // No server has this resource — try reading anyway (some servers allow arbitrary URIs) + for (var entry : clients.entrySet()) { + McpClient client = entry.getValue(); + if (!client.isInitialized() || !client.isConnected()) continue; + try { + String result = client.readResource(uri); + if (result != null && !result.isBlank()) { + return result; + } + } catch (Exception ignored) { + // Try next server + } + } + + return "Error: Resource '" + uri + "' not found on any connected MCP server. " + + "Use ListMcpResources to see available resources."; + } + + private String readFromClient(McpClient client, String serverName, String uri) { + if (!client.isInitialized() || !client.isConnected()) { + return "Error: MCP server '" + serverName + "' is not connected."; + } + try { + String content = client.readResource(uri); + if (content == null || content.isBlank()) { + return "(Resource returned empty content)"; + } + return content; + } catch (Exception e) { + return "Error reading resource '" + uri + "' from server '" + serverName + "': " + e.getMessage(); + } + } + + @Override + public String activityDescription(Map input) { + String uri = (String) input.getOrDefault("uri", "?"); + return "📖 Reading MCP resource: " + uri; + } +}