feat: MCP enhancements - HTTP+SSE transport, resource tools, env vars (Phase 2E)

- HttpSseTransport: HTTP POST + SSE streaming for modern MCP servers
- ListMcpResourcesTool: browse MCP server resources by server/URI
- ReadMcpResourceTool: read MCP resource content with auto-routing
- McpManager: environment variable expansion (\) in config
- McpManager: HTTP server support via 'url' field in mcp.json
- Registered both resource tools in AppConfig (total: 26 tools)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
pull/1/head
abel533 1 month ago
parent 758d0d2980
commit 5a6798540a
  1. 4
      src/main/java/com/claudecode/config/AppConfig.java
  2. 337
      src/main/java/com/claudecode/mcp/HttpSseTransport.java
  3. 114
      src/main/java/com/claudecode/mcp/McpManager.java
  4. 109
      src/main/java/com/claudecode/tool/impl/ListMcpResourcesTool.java
  5. 130
      src/main/java/com/claudecode/tool/impl/ReadMcpResourceTool.java

@ -117,7 +117,9 @@ public class AppConfig {
new EnterPlanModeTool(), new EnterPlanModeTool(),
new ExitPlanModeTool(), new ExitPlanModeTool(),
new SkillTool(), new SkillTool(),
new SendMessageTool() new SendMessageTool(),
new ListMcpResourcesTool(),
new ReadMcpResourceTool()
); );
// P2: 注册 MCP 工具桥接(将远程 MCP 工具映射为本地工具) // P2: 注册 MCP 工具桥接(将远程 MCP 工具映射为本地工具)

@ -0,0 +1,337 @@
package com.claudecode.mcp;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* HTTP + SSE 传输层 对应 claude-code 中的 HTTP 传输实现
* <p>
* 用于连接基于 HTTP MCP 服务器使用 SSE (Server-Sent Events) 接收通知
* 使用 HTTP POST 发送请求
* <p>
* MCP HTTP 传输协议流程
* <ol>
* <li>建立 SSE 连接获取 endpoint URL</li>
* <li>通过 POST 请求发送 JSON-RPC 消息到 endpoint</li>
* <li>通过 SSE 流接收响应和通知</li>
* </ol>
*
* @see McpTransport
*/
public class HttpSseTransport implements McpTransport {
private static final Logger log = LoggerFactory.getLogger(HttpSseTransport.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
private final String baseUrl;
private final HttpClient httpClient;
private final Map<String, String> headers;
private final Duration timeout;
/** Endpoint URL received from SSE connection */
private volatile String messageEndpoint;
/** Pending response futures: request id -> CompletableFuture */
private final ConcurrentHashMap<String, CompletableFuture<JsonNode>> pendingRequests =
new ConcurrentHashMap<>();
/** SSE connection state */
private final AtomicBoolean connected = new AtomicBoolean(false);
private volatile Future<?> sseListenerFuture;
private final ExecutorService sseExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "mcp-sse-listener");
t.setDaemon(true);
return t;
});
/**
* 创建 HTTP+SSE 传输层
*
* @param baseUrl MCP 服务器的基础 URL (e.g., "http://localhost:3000")
*/
public HttpSseTransport(String baseUrl) {
this(baseUrl, Map.of(), DEFAULT_TIMEOUT);
}
/**
* 创建 HTTP+SSE 传输层自定义头和超时
*
* @param baseUrl MCP 服务器的基础 URL
* @param headers 自定义 HTTP 如认证 token
* @param timeout 请求超时时间
*/
public HttpSseTransport(String baseUrl, Map<String, String> headers, Duration timeout) {
this.baseUrl = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
this.headers = headers != null ? headers : Map.of();
this.timeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(this.timeout)
.build();
}
/**
* 连接到 SSE 端点并开始监听
* 必须在发送请求前调用
*/
public void connect() throws McpException {
if (connected.get()) return;
log.info("Connecting to MCP HTTP server at {}", baseUrl);
// Start SSE listener
sseListenerFuture = sseExecutor.submit(() -> {
try {
listenSse();
} catch (Exception e) {
if (connected.get()) {
log.warn("SSE listener error: {}", e.getMessage());
}
}
});
// Wait for endpoint URL
int waitMs = 0;
while (messageEndpoint == null && waitMs < timeout.toMillis()) {
try {
Thread.sleep(100);
waitMs += 100;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new McpException("Interrupted while waiting for SSE endpoint");
}
}
if (messageEndpoint == null) {
throw new McpException("Timeout waiting for SSE endpoint from " + baseUrl);
}
connected.set(true);
log.info("Connected to MCP HTTP server, endpoint: {}", messageEndpoint);
}
/**
* SSE 监听循环 连接到 /sse 端点并解析事件流
*/
private void listenSse() throws Exception {
String sseUrl = baseUrl + "/sse";
log.debug("Starting SSE listener at {}", sseUrl);
var requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(sseUrl))
.timeout(Duration.ofMinutes(30)) // Long timeout for SSE
.GET();
// Add custom headers
for (var entry : headers.entrySet()) {
requestBuilder.header(entry.getKey(), entry.getValue());
}
HttpRequest request = requestBuilder.build();
HttpResponse<java.io.InputStream> response = httpClient.send(
request, HttpResponse.BodyHandlers.ofInputStream());
if (response.statusCode() != 200) {
throw new McpException("SSE connection failed with status " + response.statusCode());
}
try (var reader = new BufferedReader(
new InputStreamReader(response.body(), StandardCharsets.UTF_8))) {
String eventType = null;
StringBuilder dataBuffer = new StringBuilder();
String line;
while ((line = reader.readLine()) != null && connected.get()) {
if (line.startsWith("event:")) {
eventType = line.substring(6).strip();
} else if (line.startsWith("data:")) {
dataBuffer.append(line.substring(5).strip());
} else if (line.isEmpty() && dataBuffer.length() > 0) {
// End of event
handleSseEvent(eventType, dataBuffer.toString());
eventType = null;
dataBuffer.setLength(0);
}
}
}
}
/**
* 处理 SSE 事件
*/
private void handleSseEvent(String eventType, String data) {
if ("endpoint".equals(eventType)) {
// Server sends the POST endpoint URL
if (data.startsWith("http://") || data.startsWith("https://")) {
messageEndpoint = data;
} else {
messageEndpoint = baseUrl + (data.startsWith("/") ? data : "/" + data);
}
log.debug("Received SSE endpoint: {}", messageEndpoint);
} else if ("message".equals(eventType) || eventType == null) {
// JSON-RPC response or notification
try {
JsonNode json = MAPPER.readTree(data);
if (json.has("id")) {
// It's a response to a pending request
String id = json.get("id").asText();
CompletableFuture<JsonNode> future = pendingRequests.remove(id);
if (future != null) {
future.complete(json);
} else {
log.debug("Received response for unknown request id: {}", id);
}
} else {
// It's a notification — log it
String method = json.has("method") ? json.get("method").asText() : "unknown";
log.debug("Received SSE notification: {}", method);
}
} catch (Exception e) {
log.debug("Failed to parse SSE message data: {}", data, e);
}
}
}
@Override
public JsonNode sendRequest(String jsonRpcRequest) throws McpException {
if (!connected.get() || messageEndpoint == null) {
connect();
}
try {
// Extract request ID for response matching
JsonNode requestNode = MAPPER.readTree(jsonRpcRequest);
String requestId = requestNode.has("id") ? requestNode.get("id").asText() : null;
// Register pending response
CompletableFuture<JsonNode> responseFuture = new CompletableFuture<>();
if (requestId != null) {
pendingRequests.put(requestId, responseFuture);
}
// Send HTTP POST
var httpRequest = HttpRequest.newBuilder()
.uri(URI.create(messageEndpoint))
.timeout(timeout)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonRpcRequest))
.build();
// Add custom headers
// Note: HttpRequest is immutable, headers must be set at build time
// For simplicity, rebuild if we have custom headers
if (!headers.isEmpty()) {
var builder = HttpRequest.newBuilder()
.uri(URI.create(messageEndpoint))
.timeout(timeout)
.header("Content-Type", "application/json");
for (var entry : headers.entrySet()) {
builder.header(entry.getKey(), entry.getValue());
}
httpRequest = builder.POST(HttpRequest.BodyPublishers.ofString(jsonRpcRequest)).build();
}
HttpResponse<String> httpResponse = httpClient.send(
httpRequest, HttpResponse.BodyHandlers.ofString());
if (httpResponse.statusCode() >= 400) {
pendingRequests.remove(requestId);
throw new McpException("HTTP error " + httpResponse.statusCode()
+ ": " + httpResponse.body());
}
// If the HTTP response body contains JSON-RPC response, use it directly
String body = httpResponse.body();
if (body != null && !body.isBlank()) {
try {
JsonNode directResponse = MAPPER.readTree(body);
if (directResponse.has("result") || directResponse.has("error")) {
pendingRequests.remove(requestId);
return directResponse;
}
} catch (Exception ignored) {
// Not a JSON response, wait for SSE
}
}
// Wait for response via SSE
if (requestId != null) {
try {
return responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
pendingRequests.remove(requestId);
throw new McpException("Timeout waiting for response to request " + requestId);
}
}
// No ID means notification — return empty
return MAPPER.createObjectNode();
} catch (McpException e) {
throw e;
} catch (Exception e) {
throw new McpException("HTTP request failed: " + e.getMessage(), e);
}
}
@Override
public void sendNotification(String jsonRpcNotification) throws McpException {
if (!connected.get() || messageEndpoint == null) {
connect();
}
try {
var httpRequest = HttpRequest.newBuilder()
.uri(URI.create(messageEndpoint))
.timeout(timeout)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonRpcNotification))
.build();
HttpResponse<String> response = httpClient.send(
httpRequest, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
throw new McpException("HTTP notification failed with status " + response.statusCode());
}
} catch (McpException e) {
throw e;
} catch (Exception e) {
throw new McpException("Failed to send notification: " + e.getMessage(), e);
}
}
@Override
public boolean isConnected() {
return connected.get() && messageEndpoint != null;
}
@Override
public void close() throws Exception {
connected.set(false);
pendingRequests.values().forEach(f ->
f.completeExceptionally(new McpException("Transport closed")));
pendingRequests.clear();
if (sseListenerFuture != null) {
sseListenerFuture.cancel(true);
}
sseExecutor.shutdownNow();
log.info("HttpSseTransport closed");
}
}

@ -126,11 +126,24 @@ public class McpManager implements AutoCloseable {
Iterator<Map.Entry<String, JsonNode>> envFields = serverDef.get("env").fields(); Iterator<Map.Entry<String, JsonNode>> envFields = serverDef.get("env").fields();
while (envFields.hasNext()) { while (envFields.hasNext()) {
Map.Entry<String, JsonNode> envEntry = envFields.next(); Map.Entry<String, JsonNode> envEntry = envFields.next();
env.put(envEntry.getKey(), envEntry.getValue().asText()); env.put(envEntry.getKey(), expandEnvVars(envEntry.getValue().asText()));
} }
} }
connect(name, command, args, env); // Expand env vars in command and args
command = expandEnvVars(command);
List<String> expandedArgs = new ArrayList<>();
for (String arg : args) {
expandedArgs.add(expandEnvVars(arg));
}
// Check if this is an HTTP/SSE server (url field present)
if (serverDef.has("url")) {
String url = expandEnvVars(serverDef.get("url").asText());
connectHttp(name, url, env);
} else {
connect(name, command, expandedArgs, env);
}
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to connect MCP server '{}' from config: {}", name, e.getMessage()); log.error("Failed to connect MCP server '{}' from config: {}", name, e.getMessage());
} }
@ -415,4 +428,101 @@ public class McpManager implements AutoCloseable {
log.info("All MCP connections closed"); log.info("All MCP connections closed");
} }
/**
* 连接 HTTP+SSE MCP 服务器
*
* @param name 服务器名称
* @param url 服务器 URL
* @param env 环境变量用于请求头等
* @return 已初始化的 MCP 客户端
* @throws McpException 连接或初始化失败
*/
public McpClient connectHttp(String name, String url, Map<String, String> env) throws McpException {
if (clients.containsKey(name)) {
log.info("MCP server '{}' already exists, disconnecting old connection", name);
try {
disconnect(name);
} catch (Exception e) {
log.warn("Exception disconnecting old MCP connection '{}': {}", name, e.getMessage());
}
}
log.info("Connecting MCP HTTP server '{}': {}", name, url);
// Extract auth headers from env
Map<String, String> headers = new HashMap<>();
if (env != null) {
String authToken = env.get("AUTHORIZATION");
if (authToken != null) {
headers.put("Authorization", authToken);
}
String apiKey = env.get("API_KEY");
if (apiKey != null) {
headers.put("X-API-Key", apiKey);
}
}
HttpSseTransport transport = new HttpSseTransport(url, headers, null);
McpClient client;
try {
transport.connect();
client = new McpClient(name, transport);
client.initialize();
} catch (Exception e) {
try {
transport.close();
} catch (Exception suppressed) {
e.addSuppressed(suppressed);
}
throw (e instanceof McpException mcp) ? mcp
: new McpException("Failed to connect MCP HTTP server '" + name + "': " + e.getMessage(), e);
}
clients.put(name, client);
for (McpClient.McpTool tool : client.getTools()) {
toolToServer.put(tool.name(), name);
}
log.info("MCP HTTP server '{}' connected successfully", name);
return client;
}
/**
* 展开字符串中的环境变量引用
* 支持 ${VAR_NAME} 语法未定义的变量保留原样
*
* @param value 包含可能的环境变量引用的字符串
* @return 展开后的字符串
*/
static String expandEnvVars(String value) {
if (value == null || !value.contains("${")) {
return value;
}
StringBuilder result = new StringBuilder();
int i = 0;
while (i < value.length()) {
if (i < value.length() - 2 && value.charAt(i) == '$' && value.charAt(i + 1) == '{') {
int end = value.indexOf('}', i + 2);
if (end != -1) {
String varName = value.substring(i + 2, end);
String envVal = System.getenv(varName);
if (envVal != null) {
result.append(envVal);
} else {
result.append("${").append(varName).append("}");
}
i = end + 1;
} else {
result.append(value.charAt(i));
i++;
}
} else {
result.append(value.charAt(i));
i++;
}
}
return result.toString();
}
} }

@ -0,0 +1,109 @@
package com.claudecode.tool.impl;
import com.claudecode.mcp.McpClient;
import com.claudecode.mcp.McpManager;
import com.claudecode.tool.Tool;
import com.claudecode.tool.ToolContext;
import java.util.Map;
/**
* ListMcpResources 工具 列出 MCP 服务器提供的资源
* <p>
* 对应 claude-code 中浏览 MCP 资源的功能
* 显示所有已连接 MCP 服务器的资源列表包括 URI名称描述和 MIME 类型
*/
public class ListMcpResourcesTool implements Tool {
@Override
public String name() {
return "ListMcpResources";
}
@Override
public String description() {
return """
List resources available from connected MCP (Model Context Protocol) servers.
Shows all resources with their URIs, names, descriptions, and MIME types.
Use this to discover what data sources are available before reading them.
Optionally filter by server name.""";
}
@Override
public String inputSchema() {
return """
{
"type": "object",
"properties": {
"server": {
"type": "string",
"description": "Optional: filter resources by MCP server name"
}
}
}""";
}
@Override
public String execute(Map<String, Object> input, ToolContext context) {
McpManager mcpManager = context.getOrDefault("MCP_MANAGER", null);
if (mcpManager == null) {
return "No MCP servers configured.";
}
String serverFilter = (String) input.getOrDefault("server", null);
var clients = mcpManager.getClients();
if (clients.isEmpty()) {
return "No MCP servers connected.";
}
StringBuilder sb = new StringBuilder();
int totalResources = 0;
for (var entry : clients.entrySet()) {
String serverName = entry.getKey();
McpClient client = entry.getValue();
if (serverFilter != null && !serverFilter.isBlank()
&& !serverName.equalsIgnoreCase(serverFilter)) {
continue;
}
if (!client.isInitialized() || !client.isConnected()) {
sb.append("⚠ Server '").append(serverName).append("': not connected\n");
continue;
}
var resources = client.getResources();
if (resources.isEmpty()) {
sb.append("Server '").append(serverName).append("': no resources\n");
continue;
}
sb.append("## ").append(serverName).append(" (").append(resources.size()).append(" resources)\n\n");
for (var resource : resources) {
sb.append("- **").append(resource.name()).append("**\n");
sb.append(" URI: `").append(resource.uri()).append("`\n");
if (!resource.description().isBlank()) {
sb.append(" ").append(resource.description()).append("\n");
}
sb.append(" Type: ").append(resource.mimeType()).append("\n\n");
totalResources++;
}
}
if (totalResources == 0) {
return serverFilter != null
? "No resources found for server '" + serverFilter + "'."
: "No MCP resources available from any connected server.";
}
return sb.toString().stripTrailing();
}
@Override
public String activityDescription(Map<String, Object> input) {
return "📋 Listing MCP resources";
}
}

@ -0,0 +1,130 @@
package com.claudecode.tool.impl;
import com.claudecode.mcp.McpClient;
import com.claudecode.mcp.McpManager;
import com.claudecode.tool.Tool;
import com.claudecode.tool.ToolContext;
import java.util.Map;
/**
* ReadMcpResource 工具 读取 MCP 服务器的指定资源
* <p>
* 对应 claude-code 中读取 MCP 资源的功能
* 通过 URI MCP 服务器读取资源内容
*/
public class ReadMcpResourceTool implements Tool {
@Override
public String name() {
return "ReadMcpResource";
}
@Override
public String description() {
return """
Read a specific resource from a connected MCP (Model Context Protocol) server.
Provide the resource URI (obtained from ListMcpResources) to fetch its content.
The server name is optional if omitted, all servers are searched for the URI.""";
}
@Override
public String inputSchema() {
return """
{
"type": "object",
"properties": {
"uri": {
"type": "string",
"description": "The resource URI to read (e.g., 'file:///path' or 'custom://resource')"
},
"server": {
"type": "string",
"description": "Optional: the MCP server name that provides this resource"
}
},
"required": ["uri"]
}""";
}
@Override
public String execute(Map<String, Object> input, ToolContext context) {
String uri = (String) input.get("uri");
String serverFilter = (String) input.getOrDefault("server", null);
if (uri == null || uri.isBlank()) {
return "Error: 'uri' is required. Use ListMcpResources to discover available resources.";
}
McpManager mcpManager = context.getOrDefault("MCP_MANAGER", null);
if (mcpManager == null) {
return "Error: No MCP servers configured.";
}
var clients = mcpManager.getClients();
if (clients.isEmpty()) {
return "Error: No MCP servers connected.";
}
// If server specified, try only that server
if (serverFilter != null && !serverFilter.isBlank()) {
McpClient client = clients.get(serverFilter);
if (client == null) {
return "Error: MCP server '" + serverFilter + "' not found. "
+ "Available servers: " + String.join(", ", clients.keySet());
}
return readFromClient(client, serverFilter, uri);
}
// Try all connected servers
for (var entry : clients.entrySet()) {
McpClient client = entry.getValue();
if (!client.isInitialized() || !client.isConnected()) continue;
// Check if this server has the resource
boolean hasResource = client.getResources().stream()
.anyMatch(r -> r.uri().equals(uri));
if (hasResource) {
return readFromClient(client, entry.getKey(), uri);
}
}
// No server has this resource — try reading anyway (some servers allow arbitrary URIs)
for (var entry : clients.entrySet()) {
McpClient client = entry.getValue();
if (!client.isInitialized() || !client.isConnected()) continue;
try {
String result = client.readResource(uri);
if (result != null && !result.isBlank()) {
return result;
}
} catch (Exception ignored) {
// Try next server
}
}
return "Error: Resource '" + uri + "' not found on any connected MCP server. "
+ "Use ListMcpResources to see available resources.";
}
private String readFromClient(McpClient client, String serverName, String uri) {
if (!client.isInitialized() || !client.isConnected()) {
return "Error: MCP server '" + serverName + "' is not connected.";
}
try {
String content = client.readResource(uri);
if (content == null || content.isBlank()) {
return "(Resource returned empty content)";
}
return content;
} catch (Exception e) {
return "Error reading resource '" + uri + "' from server '" + serverName + "': " + e.getMessage();
}
}
@Override
public String activityDescription(Map<String, Object> input) {
String uri = (String) input.getOrDefault("uri", "?");
return "📖 Reading MCP resource: " + uri;
}
}
Loading…
Cancel
Save