feat: Coordinator Mode with SendMessageTool (Phase 2D)

- CoordinatorMode: env-var activation, system prompt, tool filtering
- SendMessageTool: direct message, broadcast, message queuing
- Coordinator allowed tools: Agent, SendMessage, TaskStop, TaskGet, TaskList, TaskOutput
- Worker allowed tools: Read, Write, Edit, Bash, Grep, Glob, etc.
- AppConfig: coordinator mode detection, specialized system prompt
- Registered SendMessageTool in ToolRegistry

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
pull/1/head
abel533 1 month ago
parent 6e49c4fdc7
commit 758d0d2980
  1. 13
      src/main/java/com/claudecode/config/AppConfig.java
  2. 161
      src/main/java/com/claudecode/core/CoordinatorMode.java
  3. 195
      src/main/java/com/claudecode/tool/impl/SendMessageTool.java

@ -7,6 +7,7 @@ import com.claudecode.context.GitContext;
import com.claudecode.context.SkillLoader;
import com.claudecode.context.SystemPromptBuilder;
import com.claudecode.core.AgentLoop;
import com.claudecode.core.CoordinatorMode;
import com.claudecode.core.SessionMemoryService;
import com.claudecode.core.TaskManager;
import com.claudecode.core.TokenTracker;
@ -115,7 +116,8 @@ public class AppConfig {
new ToolSearchTool(),
new EnterPlanModeTool(),
new ExitPlanModeTool(),
new SkillTool()
new SkillTool(),
new SendMessageTool()
);
// P2: 注册 MCP 工具桥接(将远程 MCP 工具映射为本地工具)
@ -268,6 +270,15 @@ public class AppConfig {
// Load existing session memory
String sessionMemory = sessionMemoryService.getMemoryContent();
// Check if coordinator mode is enabled
if (CoordinatorMode.isCoordinatorMode()) {
log.info("Coordinator mode enabled via CLAUDE_CODE_COORDINATOR_MODE env var");
// Coordinator uses a specialized system prompt
String coordinatorPrompt = CoordinatorMode.getCoordinatorSystemPrompt();
String userContext = CoordinatorMode.getCoordinatorUserContext();
return coordinatorPrompt + "\n\n" + userContext;
}
return new SystemPromptBuilder()
.claudeMd(claudeMd)
.skills(skillsSummary)

@ -0,0 +1,161 @@
package com.claudecode.core;
import com.claudecode.tool.ToolRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
/**
* Coordinator Mode 对应 claude-code/src/coordinator/coordinatorMode.ts
* <p>
* 协调模式允许 Agent 作为"协调者"运行仅使用 AgentSendMessageTaskStop 工具
* 来派发和管理 worker agentWorker agent 使用标准工具集执行实际任务
* <p>
* 通过环境变量 CLAUDE_CODE_COORDINATOR_MODE=1 启用
*/
public class CoordinatorMode {
private static final Logger log = LoggerFactory.getLogger(CoordinatorMode.class);
/** Coordinator 可用的工具集 */
public static final Set<String> COORDINATOR_ALLOWED_TOOLS = Set.of(
"Agent", // 派发 worker
"SendMessage", // 向 worker 发送消息
"TaskStop", // 停止 worker
"TaskGet", // 查看 worker 状态
"TaskList", // 列出所有 worker
"TaskOutput" // 获取 worker 输出
);
/** Worker(异步 agent)可用的工具集 */
public static final Set<String> WORKER_ALLOWED_TOOLS = Set.of(
"Read", // 读取文件
"Write", // 写入文件
"Edit", // 编辑文件
"Bash", // 执行命令
"Grep", // 搜索文件内容
"Glob", // 文件模式匹配
"ListFiles", // 列出目录
"WebFetch", // 获取网页
"WebSearch", // 搜索网页
"TodoRead", // 读取待办
"TodoWrite", // 写待办
"ToolSearch", // 搜索工具
"Skill" // 执行 skill
);
/** 检查 coordinator 模式是否通过环境变量启用 */
public static boolean isCoordinatorMode() {
String envVal = System.getenv("CLAUDE_CODE_COORDINATOR_MODE");
return envVal != null && !envVal.isBlank()
&& !envVal.equalsIgnoreCase("false")
&& !envVal.equals("0");
}
/**
* 获取 Coordinator 系统提示词
* 对应 TS getCoordinatorSystemPrompt()
*/
public static String getCoordinatorSystemPrompt() {
return """
You are Claude Code, an AI assistant that orchestrates software engineering tasks \
across multiple workers. Your role is to:
1. Understand user requests and decompose them into parallel tasks
2. Spawn worker agents for each task using the Agent tool
3. Monitor worker progress and synthesize results
4. Communicate clear, actionable results to the user
## Your Tools
- **Agent** Spawn a worker to execute a specific task. Workers have access to \
file operations (Read, Write, Edit), shell commands (Bash), search (Grep, Glob), \
web access, and project skills.
- **SendMessage** Send follow-up instructions to a running or completed worker. \
Use this to continue multi-step workflows or provide corrections.
- **TaskStop** Forcefully terminate a worker that is stuck or no longer needed.
- **TaskGet** Check the current status and output of a specific worker.
- **TaskList** List all active and completed workers.
- **TaskOutput** Get the full output of a completed worker.
## Worker Results
When a worker completes, you'll receive a task-notification with:
- task-id: The worker's unique identifier
- status: completed, failed, or cancelled
- summary: Brief description of what was accomplished
- result: Full output from the worker
## Workflow Guidance
### Task Decomposition
1. **Research Phase**: Spawn workers to investigate the codebase, understand the problem
2. **Synthesis**: Analyze worker results, identify patterns, form a plan
3. **Implementation Phase**: Spawn workers for code changes, each with specific scope
4. **Verification Phase**: Spawn workers to test, lint, and validate changes
### Writing Worker Prompts
- Be **self-contained**: Workers cannot see your conversation history
- Include **file paths** (absolute), **line numbers**, and **exact context**
- Specify **expected output format** (what you need from the result)
- Add a **purpose statement** so the worker understands the bigger picture
- If building on previous findings, **summarize those findings** in the prompt
### Concurrency Management
- Spawn independent tasks **in parallel** for maximum throughput
- Workers that depend on others' results should be spawned **sequentially**
- Don't over-decompose if a task is simple, one worker is enough
- Group related small changes into a single worker's scope
### Verification Best Practices
- Always verify implementation changes with a dedicated verification worker
- The verification worker should run existing tests and any new tests
- Ask the verification worker to check for common issues (imports, types, edge cases)
## Communication
- Every message you send is directed to the **user** (not workers)
- Provide concise status updates as workers complete
- Synthesize worker results into a clear, coherent summary
- If something goes wrong, explain what happened and propose next steps
## Important Rules
- You do NOT have direct access to files, shell, or search delegate those to workers
- DO NOT attempt to edit files yourself; spawn a worker for any file operations
- Keep your conversation focused on orchestration and synthesis
- If a worker fails, analyze the error and spawn a corrective worker
""";
}
/**
* 获取 coordinator 的用户上下文消息
* 告知 coordinator worker 可用的工具集
*/
public static String getCoordinatorUserContext() {
StringBuilder sb = new StringBuilder();
sb.append("## Worker Capabilities\n\n");
sb.append("Workers have access to the following tools:\n");
for (String tool : WORKER_ALLOWED_TOOLS.stream().sorted().toList()) {
sb.append("- ").append(tool).append("\n");
}
sb.append("\nPlus any MCP tools from connected servers.\n");
return sb.toString();
}
/**
* 过滤 ToolRegistry仅保留 coordinator 可用的工具
*/
public static java.util.List<String> filterForCoordinator(ToolRegistry registry) {
return registry.getToolNames().stream()
.filter(COORDINATOR_ALLOWED_TOOLS::contains)
.toList();
}
/**
* 过滤 ToolRegistry仅保留 worker 可用的工具
*/
public static java.util.List<String> filterForWorker(ToolRegistry registry) {
return registry.getToolNames().stream()
.filter(WORKER_ALLOWED_TOOLS::contains)
.toList();
}
}

@ -0,0 +1,195 @@
package com.claudecode.tool.impl;
import com.claudecode.core.TaskManager;
import com.claudecode.tool.Tool;
import com.claudecode.tool.ToolContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* SendMessage 工具 对应 claude-code/src/tools/SendMessageTool/
* <p>
* Coordinator 模式下用于向正在运行的 worker agent 发送消息
* 支持继续执行提供反馈或请求停止
* <p>
* 消息类型
* <ul>
* <li>普通文本 继续指示或额外上下文</li>
* <li>shutdown_request 请求 worker 优雅退出</li>
* <li>broadcast 向所有 worker 广播to="*"</li>
* </ul>
*/
public class SendMessageTool implements Tool {
private static final Logger log = LoggerFactory.getLogger(SendMessageTool.class);
public static final String TOOL_NAME = "SendMessage";
/** ToolContext key for pending messages map: Map<String, List<String>> */
public static final String PENDING_MESSAGES_KEY = "__pending_messages__";
@Override
public String name() {
return TOOL_NAME;
}
@Override
public String description() {
return """
Send a message to a running worker agent (teammate). Use this to:
- Continue a worker with additional instructions after it completes a task
- Provide follow-up context or corrections to a running worker
- Request a worker to stop (shutdown_request)
- Broadcast a message to all workers (to="*")
The message will be queued and delivered to the worker on its next tool round.
If the worker has already completed, it will be resumed with the new message.
IMPORTANT:
- Workers cannot see the coordinator's conversation history.
- Include all necessary context in the message.
- Use TaskStop to forcefully terminate a worker; SendMessage for graceful communication.""";
}
@Override
public String inputSchema() {
return """
{
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient: task ID, agent name, or '*' for broadcast"
},
"message": {
"type": "string",
"description": "The message content to send"
},
"summary": {
"type": "string",
"description": "Brief 5-10 word summary of the message"
}
},
"required": ["to", "message"]
}""";
}
@Override
@SuppressWarnings("unchecked")
public String execute(Map<String, Object> input, ToolContext context) {
String to = (String) input.get("to");
String message = (String) input.get("message");
String summary = (String) input.getOrDefault("summary", "");
if (to == null || to.isBlank()) {
return "Error: 'to' is required — specify a task ID, agent name, or '*' for broadcast";
}
if (message == null || message.isBlank()) {
return "Error: 'message' is required";
}
TaskManager taskManager = context.getOrDefault("TASK_MANAGER", null);
if (taskManager == null) {
return "Error: TaskManager not available";
}
// Broadcast to all running workers
if ("*".equals(to)) {
return handleBroadcast(message, summary, taskManager, context);
}
// Send to specific worker
return handleDirectMessage(to, message, summary, taskManager, context);
}
private String handleDirectMessage(String to, String message, String summary,
TaskManager taskManager, ToolContext context) {
var taskOpt = taskManager.getTask(to);
if (taskOpt.isEmpty()) {
// Try to find by description/name match
var allTasks = taskManager.listTasks();
var matched = allTasks.stream()
.filter(t -> t.description().toLowerCase().contains(to.toLowerCase()))
.findFirst();
if (matched.isEmpty()) {
return "Error: No task found with ID or name matching '" + to + "'";
}
taskOpt = matched;
}
var task = taskOpt.get();
// Queue the message for the worker
queueMessage(task.id(), message, context);
String statusInfo = switch (task.status()) {
case RUNNING -> "Message queued for running worker '" + task.description() + "'";
case COMPLETED -> "Worker '" + task.description() + "' has completed. "
+ "Message stored but worker will need to be re-spawned to receive it.";
case PENDING -> "Message queued for pending worker '" + task.description() + "'";
case FAILED -> "Warning: Worker '" + task.description() + "' has failed. "
+ "Message stored but worker may need to be re-spawned.";
case CANCELLED -> "Warning: Worker '" + task.description() + "' was cancelled. "
+ "Message stored but worker will need to be re-spawned.";
};
log.info("SendMessage to {}: {}", task.id(),
summary.isBlank() ? truncate(message, 50) : summary);
return statusInfo;
}
private String handleBroadcast(String message, String summary,
TaskManager taskManager, ToolContext context) {
var runningTasks = taskManager.listTasks(TaskManager.TaskStatus.RUNNING);
if (runningTasks.isEmpty()) {
return "No running workers to broadcast to.";
}
int count = 0;
StringBuilder sb = new StringBuilder();
sb.append("Broadcast sent to ").append(runningTasks.size()).append(" worker(s):\n");
for (var task : runningTasks) {
queueMessage(task.id(), message, context);
sb.append(" • ").append(task.id()).append(" (").append(task.description()).append(")\n");
count++;
}
log.info("Broadcast to {} workers: {}",
count, summary.isBlank() ? truncate(message, 50) : summary);
return sb.toString().stripTrailing();
}
@SuppressWarnings("unchecked")
private void queueMessage(String taskId, String message, ToolContext context) {
Map<String, java.util.List<String>> pendingMessages =
context.getOrDefault(PENDING_MESSAGES_KEY, null);
if (pendingMessages == null) {
pendingMessages = new java.util.concurrent.ConcurrentHashMap<>();
context.set(PENDING_MESSAGES_KEY, pendingMessages);
}
pendingMessages.computeIfAbsent(taskId, k -> new java.util.concurrent.CopyOnWriteArrayList<>())
.add(message);
}
private String truncate(String text, int maxLen) {
if (text == null || text.length() <= maxLen) return text;
return text.substring(0, maxLen - 3) + "...";
}
@Override
public String activityDescription(Map<String, Object> input) {
String to = (String) input.getOrDefault("to", "?");
String summary = (String) input.getOrDefault("summary", "");
if (!summary.isBlank()) {
return "📨 SendMessage to " + to + ": " + summary;
}
return "📨 SendMessage to " + to;
}
}
Loading…
Cancel
Save