From 758d0d29804653daa4f5e329464075fe887ba038 Mon Sep 17 00:00:00 2001 From: abel533 Date: Sun, 5 Apr 2026 09:50:09 +0800 Subject: [PATCH] feat: Coordinator Mode with SendMessageTool (Phase 2D) - CoordinatorMode: env-var activation, system prompt, tool filtering - SendMessageTool: direct message, broadcast, message queuing - Coordinator allowed tools: Agent, SendMessage, TaskStop, TaskGet, TaskList, TaskOutput - Worker allowed tools: Read, Write, Edit, Bash, Grep, Glob, etc. - AppConfig: coordinator mode detection, specialized system prompt - Registered SendMessageTool in ToolRegistry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../java/com/claudecode/config/AppConfig.java | 13 +- .../com/claudecode/core/CoordinatorMode.java | 161 +++++++++++++++ .../claudecode/tool/impl/SendMessageTool.java | 195 ++++++++++++++++++ 3 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/claudecode/core/CoordinatorMode.java create mode 100644 src/main/java/com/claudecode/tool/impl/SendMessageTool.java diff --git a/src/main/java/com/claudecode/config/AppConfig.java b/src/main/java/com/claudecode/config/AppConfig.java index eb7d9e5..ef57851 100644 --- a/src/main/java/com/claudecode/config/AppConfig.java +++ b/src/main/java/com/claudecode/config/AppConfig.java @@ -7,6 +7,7 @@ import com.claudecode.context.GitContext; import com.claudecode.context.SkillLoader; import com.claudecode.context.SystemPromptBuilder; import com.claudecode.core.AgentLoop; +import com.claudecode.core.CoordinatorMode; import com.claudecode.core.SessionMemoryService; import com.claudecode.core.TaskManager; import com.claudecode.core.TokenTracker; @@ -115,7 +116,8 @@ public class AppConfig { new ToolSearchTool(), new EnterPlanModeTool(), new ExitPlanModeTool(), - new SkillTool() + new SkillTool(), + new SendMessageTool() ); // P2: 注册 MCP 工具桥接(将远程 MCP 工具映射为本地工具) @@ -268,6 +270,15 @@ public class AppConfig { // Load existing session memory String sessionMemory = sessionMemoryService.getMemoryContent(); + // Check if coordinator mode is enabled + if (CoordinatorMode.isCoordinatorMode()) { + log.info("Coordinator mode enabled via CLAUDE_CODE_COORDINATOR_MODE env var"); + // Coordinator uses a specialized system prompt + String coordinatorPrompt = CoordinatorMode.getCoordinatorSystemPrompt(); + String userContext = CoordinatorMode.getCoordinatorUserContext(); + return coordinatorPrompt + "\n\n" + userContext; + } + return new SystemPromptBuilder() .claudeMd(claudeMd) .skills(skillsSummary) diff --git a/src/main/java/com/claudecode/core/CoordinatorMode.java b/src/main/java/com/claudecode/core/CoordinatorMode.java new file mode 100644 index 0000000..dbd9c81 --- /dev/null +++ b/src/main/java/com/claudecode/core/CoordinatorMode.java @@ -0,0 +1,161 @@ +package com.claudecode.core; + +import com.claudecode.tool.ToolRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +/** + * Coordinator Mode —— 对应 claude-code/src/coordinator/coordinatorMode.ts。 + *

+ * 协调模式允许 Agent 作为"协调者"运行,仅使用 Agent、SendMessage、TaskStop 工具 + * 来派发和管理 worker agent。Worker agent 使用标准工具集执行实际任务。 + *

+ * 通过环境变量 CLAUDE_CODE_COORDINATOR_MODE=1 启用。 + */ +public class CoordinatorMode { + + private static final Logger log = LoggerFactory.getLogger(CoordinatorMode.class); + + /** Coordinator 可用的工具集 */ + public static final Set COORDINATOR_ALLOWED_TOOLS = Set.of( + "Agent", // 派发 worker + "SendMessage", // 向 worker 发送消息 + "TaskStop", // 停止 worker + "TaskGet", // 查看 worker 状态 + "TaskList", // 列出所有 worker + "TaskOutput" // 获取 worker 输出 + ); + + /** Worker(异步 agent)可用的工具集 */ + public static final Set WORKER_ALLOWED_TOOLS = Set.of( + "Read", // 读取文件 + "Write", // 写入文件 + "Edit", // 编辑文件 + "Bash", // 执行命令 + "Grep", // 搜索文件内容 + "Glob", // 文件模式匹配 + "ListFiles", // 列出目录 + "WebFetch", // 获取网页 + "WebSearch", // 搜索网页 + "TodoRead", // 读取待办 + "TodoWrite", // 写待办 + "ToolSearch", // 搜索工具 + "Skill" // 执行 skill + ); + + /** 检查 coordinator 模式是否通过环境变量启用 */ + public static boolean isCoordinatorMode() { + String envVal = System.getenv("CLAUDE_CODE_COORDINATOR_MODE"); + return envVal != null && !envVal.isBlank() + && !envVal.equalsIgnoreCase("false") + && !envVal.equals("0"); + } + + /** + * 获取 Coordinator 系统提示词。 + * 对应 TS 版 getCoordinatorSystemPrompt()。 + */ + public static String getCoordinatorSystemPrompt() { + return """ + You are Claude Code, an AI assistant that orchestrates software engineering tasks \ + across multiple workers. Your role is to: + 1. Understand user requests and decompose them into parallel tasks + 2. Spawn worker agents for each task using the Agent tool + 3. Monitor worker progress and synthesize results + 4. Communicate clear, actionable results to the user + + ## Your Tools + + - **Agent** — Spawn a worker to execute a specific task. Workers have access to \ + file operations (Read, Write, Edit), shell commands (Bash), search (Grep, Glob), \ + web access, and project skills. + - **SendMessage** — Send follow-up instructions to a running or completed worker. \ + Use this to continue multi-step workflows or provide corrections. + - **TaskStop** — Forcefully terminate a worker that is stuck or no longer needed. + - **TaskGet** — Check the current status and output of a specific worker. + - **TaskList** — List all active and completed workers. + - **TaskOutput** — Get the full output of a completed worker. + + ## Worker Results + + When a worker completes, you'll receive a task-notification with: + - task-id: The worker's unique identifier + - status: completed, failed, or cancelled + - summary: Brief description of what was accomplished + - result: Full output from the worker + + ## Workflow Guidance + + ### Task Decomposition + 1. **Research Phase**: Spawn workers to investigate the codebase, understand the problem + 2. **Synthesis**: Analyze worker results, identify patterns, form a plan + 3. **Implementation Phase**: Spawn workers for code changes, each with specific scope + 4. **Verification Phase**: Spawn workers to test, lint, and validate changes + + ### Writing Worker Prompts + - Be **self-contained**: Workers cannot see your conversation history + - Include **file paths** (absolute), **line numbers**, and **exact context** + - Specify **expected output format** (what you need from the result) + - Add a **purpose statement** so the worker understands the bigger picture + - If building on previous findings, **summarize those findings** in the prompt + + ### Concurrency Management + - Spawn independent tasks **in parallel** for maximum throughput + - Workers that depend on others' results should be spawned **sequentially** + - Don't over-decompose — if a task is simple, one worker is enough + - Group related small changes into a single worker's scope + + ### Verification Best Practices + - Always verify implementation changes with a dedicated verification worker + - The verification worker should run existing tests and any new tests + - Ask the verification worker to check for common issues (imports, types, edge cases) + + ## Communication + - Every message you send is directed to the **user** (not workers) + - Provide concise status updates as workers complete + - Synthesize worker results into a clear, coherent summary + - If something goes wrong, explain what happened and propose next steps + + ## Important Rules + - You do NOT have direct access to files, shell, or search — delegate those to workers + - DO NOT attempt to edit files yourself; spawn a worker for any file operations + - Keep your conversation focused on orchestration and synthesis + - If a worker fails, analyze the error and spawn a corrective worker + """; + } + + /** + * 获取 coordinator 的用户上下文消息。 + * 告知 coordinator worker 可用的工具集。 + */ + public static String getCoordinatorUserContext() { + StringBuilder sb = new StringBuilder(); + sb.append("## Worker Capabilities\n\n"); + sb.append("Workers have access to the following tools:\n"); + for (String tool : WORKER_ALLOWED_TOOLS.stream().sorted().toList()) { + sb.append("- ").append(tool).append("\n"); + } + sb.append("\nPlus any MCP tools from connected servers.\n"); + return sb.toString(); + } + + /** + * 过滤 ToolRegistry,仅保留 coordinator 可用的工具。 + */ + public static java.util.List filterForCoordinator(ToolRegistry registry) { + return registry.getToolNames().stream() + .filter(COORDINATOR_ALLOWED_TOOLS::contains) + .toList(); + } + + /** + * 过滤 ToolRegistry,仅保留 worker 可用的工具。 + */ + public static java.util.List filterForWorker(ToolRegistry registry) { + return registry.getToolNames().stream() + .filter(WORKER_ALLOWED_TOOLS::contains) + .toList(); + } +} diff --git a/src/main/java/com/claudecode/tool/impl/SendMessageTool.java b/src/main/java/com/claudecode/tool/impl/SendMessageTool.java new file mode 100644 index 0000000..fcae6bc --- /dev/null +++ b/src/main/java/com/claudecode/tool/impl/SendMessageTool.java @@ -0,0 +1,195 @@ +package com.claudecode.tool.impl; + +import com.claudecode.core.TaskManager; +import com.claudecode.tool.Tool; +import com.claudecode.tool.ToolContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * SendMessage 工具 —— 对应 claude-code/src/tools/SendMessageTool/。 + *

+ * 在 Coordinator 模式下用于向正在运行的 worker agent 发送消息, + * 支持继续执行、提供反馈或请求停止。 + *

+ * 消息类型: + *

+ */ +public class SendMessageTool implements Tool { + + private static final Logger log = LoggerFactory.getLogger(SendMessageTool.class); + + public static final String TOOL_NAME = "SendMessage"; + + /** ToolContext key for pending messages map: Map> */ + public static final String PENDING_MESSAGES_KEY = "__pending_messages__"; + + @Override + public String name() { + return TOOL_NAME; + } + + @Override + public String description() { + return """ + Send a message to a running worker agent (teammate). Use this to: + - Continue a worker with additional instructions after it completes a task + - Provide follow-up context or corrections to a running worker + - Request a worker to stop (shutdown_request) + - Broadcast a message to all workers (to="*") + + The message will be queued and delivered to the worker on its next tool round. + If the worker has already completed, it will be resumed with the new message. + + IMPORTANT: + - Workers cannot see the coordinator's conversation history. + - Include all necessary context in the message. + - Use TaskStop to forcefully terminate a worker; SendMessage for graceful communication."""; + } + + @Override + public String inputSchema() { + return """ + { + "type": "object", + "properties": { + "to": { + "type": "string", + "description": "Recipient: task ID, agent name, or '*' for broadcast" + }, + "message": { + "type": "string", + "description": "The message content to send" + }, + "summary": { + "type": "string", + "description": "Brief 5-10 word summary of the message" + } + }, + "required": ["to", "message"] + }"""; + } + + @Override + @SuppressWarnings("unchecked") + public String execute(Map input, ToolContext context) { + String to = (String) input.get("to"); + String message = (String) input.get("message"); + String summary = (String) input.getOrDefault("summary", ""); + + if (to == null || to.isBlank()) { + return "Error: 'to' is required — specify a task ID, agent name, or '*' for broadcast"; + } + if (message == null || message.isBlank()) { + return "Error: 'message' is required"; + } + + TaskManager taskManager = context.getOrDefault("TASK_MANAGER", null); + if (taskManager == null) { + return "Error: TaskManager not available"; + } + + // Broadcast to all running workers + if ("*".equals(to)) { + return handleBroadcast(message, summary, taskManager, context); + } + + // Send to specific worker + return handleDirectMessage(to, message, summary, taskManager, context); + } + + private String handleDirectMessage(String to, String message, String summary, + TaskManager taskManager, ToolContext context) { + var taskOpt = taskManager.getTask(to); + if (taskOpt.isEmpty()) { + // Try to find by description/name match + var allTasks = taskManager.listTasks(); + var matched = allTasks.stream() + .filter(t -> t.description().toLowerCase().contains(to.toLowerCase())) + .findFirst(); + if (matched.isEmpty()) { + return "Error: No task found with ID or name matching '" + to + "'"; + } + taskOpt = matched; + } + + var task = taskOpt.get(); + + // Queue the message for the worker + queueMessage(task.id(), message, context); + + String statusInfo = switch (task.status()) { + case RUNNING -> "Message queued for running worker '" + task.description() + "'"; + case COMPLETED -> "Worker '" + task.description() + "' has completed. " + + "Message stored but worker will need to be re-spawned to receive it."; + case PENDING -> "Message queued for pending worker '" + task.description() + "'"; + case FAILED -> "Warning: Worker '" + task.description() + "' has failed. " + + "Message stored but worker may need to be re-spawned."; + case CANCELLED -> "Warning: Worker '" + task.description() + "' was cancelled. " + + "Message stored but worker will need to be re-spawned."; + }; + + log.info("SendMessage to {}: {}", task.id(), + summary.isBlank() ? truncate(message, 50) : summary); + + return statusInfo; + } + + private String handleBroadcast(String message, String summary, + TaskManager taskManager, ToolContext context) { + var runningTasks = taskManager.listTasks(TaskManager.TaskStatus.RUNNING); + if (runningTasks.isEmpty()) { + return "No running workers to broadcast to."; + } + + int count = 0; + StringBuilder sb = new StringBuilder(); + sb.append("Broadcast sent to ").append(runningTasks.size()).append(" worker(s):\n"); + + for (var task : runningTasks) { + queueMessage(task.id(), message, context); + sb.append(" • ").append(task.id()).append(" (").append(task.description()).append(")\n"); + count++; + } + + log.info("Broadcast to {} workers: {}", + count, summary.isBlank() ? truncate(message, 50) : summary); + + return sb.toString().stripTrailing(); + } + + @SuppressWarnings("unchecked") + private void queueMessage(String taskId, String message, ToolContext context) { + Map> pendingMessages = + context.getOrDefault(PENDING_MESSAGES_KEY, null); + + if (pendingMessages == null) { + pendingMessages = new java.util.concurrent.ConcurrentHashMap<>(); + context.set(PENDING_MESSAGES_KEY, pendingMessages); + } + + pendingMessages.computeIfAbsent(taskId, k -> new java.util.concurrent.CopyOnWriteArrayList<>()) + .add(message); + } + + private String truncate(String text, int maxLen) { + if (text == null || text.length() <= maxLen) return text; + return text.substring(0, maxLen - 3) + "..."; + } + + @Override + public String activityDescription(Map input) { + String to = (String) input.getOrDefault("to", "?"); + String summary = (String) input.getOrDefault("summary", ""); + if (!summary.isBlank()) { + return "📨 SendMessage to " + to + ": " + summary; + } + return "📨 SendMessage to " + to; + } +}