BoxAgnts工具系统多Provider适配与Agent查询循环指南
摘要
BoxAgnts通过三层抽象适配不同AI厂商API差异:统一数据模型、LlmProvidertrait和Transformer。流式
BoxAgnts 的工具系统解决了一个核心问题:让工具在 WASM 沙箱中安全运行,并由上层的 Tool trait 统一管控调用。不过,工具最终是给 AI 模型调用的,这又引出了两个工程难题:不同 AI 厂商的 API 格式互不兼容,以及对话流与工具执行的交替编排。前者依靠 Provider 抽象层来解决,后者则全靠 Agent 查询循环来兜底。

Provider 抽象:让系统对 LLM 厂商保持中立
不同 AI 厂商的 API,在请求格式、响应格式乃至错误处理方式上差异悬殊。以请求侧为例:Anthropic 将角色划分为 user 和 assistant,系统提示词作为独立的顶层字段 system;OpenAI 则把系统提示词当作一条 role: "system" 的消息;Google Gemini 将 system_instruction 放在请求体顶层,但格式又与 Anthropic 有别。如果让上层的 Agent 循环直接面对这些差异,代码会迅速膨胀成一个巨大的 match provider_id { ... } 分支——这种做法显然不可持续。
BoxAgnts 的解决方案是引入三层抽象。
第一层:ProviderRequest / ProviderResponse 统一数据模型
// provider_types.rs
pub struct ProviderRequest {
pub messages: Vec,
pub system: Option<String>,
pub tools: Vec,
pub max_tokens: u32,
pub temperature: Option<f32>,
}pub struct ProviderResponse {
pub content: Vec,
pub usage: UsageInfo,
pub stop_reason: String,
}
Agent 循环只与这两个结构交互,无论背后是 Anthropic 还是 OpenAI,无需关心。
第二层:LlmProvider trait
pub trait LlmProvider: Send + Sync {
fn id(&self) -> &ProviderId;
async fn create_message_stream(
&self, request: ProviderRequest
) -> ResultBox <dyn Stream- Result
> + Send>>>;
async fn list_models(&self) -> Result<Vec>;
}
注意 create_message_stream 返回 Pin——这是 Rust 异步生态中统一多种流类型的标准做法,类似于 Java 的 Stream 或 Python 的 AsyncIterator。每个 Provider 实现各自处理 HTTP 请求构建、认证和 SSE 解析,对外只暴露统一的 StreamEvent。
第三层:Transformer(消息格式转换)
Transformer 负责消除厂商格式差异的最后一公里:
// transformers/anthropic.rs
pub fn to_anthropic_request(req: &ProviderRequest) -> AnthropicMessagesRequest { ... }// transformers/openai_chat.rs
pub fn to_openai_request(req: &ProviderRequest) -> OpenAIChatRequest { ... }
Transformer 是纯函数——输入统一格式,输出厂商格式。要增加新 Provider,只需实现一个新的 Transformer 和对应的 LlmProvider 实现。共享的 ProviderRegistry 通过 Provider ID 查找对应实现:
pub struct ProviderRegistry {
providers: HashMapdyn LlmProvider>>,
default_provider_id: ProviderId,
}
流式协议与 SSE 解析
所有 Provider 的流式交互都基于 SSE(Server-Sent Events)。但各家厂商的 SSE 事件粒度和语义各不相同:
- Anthropic 提供
content_block_start/content_block_delta/content_block_stop三级事件,一个 ContentBlock 从 start 到 stop 跨越多条 SSE 消息 - OpenAI 使用
choices[0].delta作为扁平增量,没有显式的 block start/stop - Google Gemini 采用 gRPC-web 协议,拥有自己独特的流式格式
BoxAgnts 的 stream_parser 模块消化了所有这些差异,向上暴露统一的 StreamEvent 枚举:
pub enum StreamEvent {
TextDelta { text: String },
ToolUseStart { id: String, name: String },
ToolUseDelta { id: String, json: String },
ToolUseEnd { id: String },
ThinkingDelta { text: String },
UsageUpdate { input_tokens: u32, output_tokens: u32 },
MessageStop,
}
每个 Provider 的 stream parser 内部是一个有限状态机。以 Anthropic 为例:
等待 message_start
│
├── message_start ──► 提取 model, usage 初始值
│
├── content_block_start
│ │ type = "text" → 创建 TextBlock 状态
│ │ type = "tool_use" → 创建 ToolUseBlock 状态,emit ToolUseStart
│ │ type = "thinking" → 创建 ThinkingBlock 状态
│
├── content_block_delta
│ │ text_delta → 追加到当前 TextBlock,emit TextDelta
│ │ input_json_delta → 拼接 JSON 片段到 ToolUseBlock,emit ToolUseDelta
│ │ thinking_delta → 追加到 ThinkingBlock,emit ThinkingDelta
│
├── content_block_stop
│ │ 对应 tool_use block → emit ToolUseEnd
│
└── message_stop ──► emit MessageStop,累加最终 usage
StreamAccumulator 维护当前消息的所有 ContentBlock 状态:
pub struct StreamAccumulator {
text_blocks: Vec,
tool_use_blocks: HashMap<String, ToolUseBlock>,
thinking_block: Option<String>,
usage: UsageInfo,
}
当 MessageStop 到达时,finish() 将累积的各块组装成完整的 Message,返回 stop_reason 和最终 UsageInfo。
Agent 查询循环
流解析器把 SSE 事件转成了结构化的 Message。接下来,query::run_query_loop() 把这个 Message 交给工具系统。
核心流程:
loop {
// 1. 将消息历史、系统 Prompt 和工具列表发送给 AI 模型
let request = CreateMessageRequest::builder(model, max_tokens)
.messages(messages)
.tools(all_tools_as_definitions(tools))
.build(); // 2. 发起流式请求并解析 SSE 事件
let mut rx = client.create_message_stream(request).await?;
let mut acc = StreamAccumulator::new(); while let Some(evt) = rx.recv().await {
acc.on_event(&evt);
match evt {
StreamEvent::ToolUseStart { .. } | StreamEvent::ToolUseDelta { .. } => {
// 实时发送给前端(通过 WebSocket),让用户看到模型正在使用什么工具
}
StreamEvent::MessageStop => break,
_ => {}
}
} // 3. 组装完成的 Message 并检查 stop_reason
let (msg, usage, stop_reason) = acc.finish(); match stop_reason {
"end_turn" => return QueryOutcome::EndTurn { message: msg, usage },
"tool_use" => {
// 4. 对每个 tool_use ContentBlock 调用对应工具
for block in msg.content.iter() {
if let ContentBlock::ToolUse { name, input, .. } = block {
let tool = find_tool(tools, name);
let result = tool.execute(input, &ctx).await;
messages.push(result_to_message(result));
}
}
// 回到 loop 顶部,继续下一轮
}
"max_tokens" => {
// 5. MaxTokens 恢复:注入提示消息,让模型继续
messages.push(UserMessage("Output token limit hit. Resume directly."));
max_tokens_count += 1;
if max_tokens_count > 3 { return MaxTokens { ... }; }
}
_ => return Error(...),
} turn += 1;
if turn >= config.max_turns { break; }
}
值得关注的几个细节:
工具列表注入策略。每轮 API 调用都会将完整的工具列表(所有工具的 name、description 和 input_schema)作为 tools 字段发送给 AI 模型。这会产生固定的 token 开销——工具数量越多,每轮的"工具描述 tokens"越高。当工具超过 20 个时,这个开销开始显著(可能达到几千 tokens/轮)。BoxAgnts 目前的策略是全量注入,未来计划引入工具选择和分组机制(类似 Anthropic 的 tool_choice)。
MaxTokens 恢复。如果模型在回答中途耗尽了输出 token 限制,它并未真正"失败"——只是没说完。BoxAgnts 会自动注入一条恢复消息("Output token limit hit. Resume directly..."),让模型继续。这个循环最多执行 3 次——如果 3 次后仍然达到 max_tokens,说明任务确实太长,放弃并返回部分结果。
取消机制。CancellationToken 来自 tokio 生态。当用户在前端点击"停止"按钮时,WebSocket 处理器会取消对应的 token,run_query_loop 在下一次检查时返回 QueryOutcome::Cancelled。
费用追踪。每轮 API 调用后,CostTracker 累加当前模型的定价(按 input/output token 分别计价,不同模型价格不同)。如果累计费用超过 budget_limit_usd,返回 QueryOutcome::BudgetExceeded。费用信息通过 WebSocket 实时推送到前端 Dashboard。
错误处理与重试策略
AI API 调用存在几种典型的失败模式:
| 错误类型 | 典型 HTTP 状态码 | 策略 |
|---|---|---|
| 限流 (Rate Limit) | 429 | 指数退避重试,尊重 Retry-After 头 |
| 过载 (Overloaded) | 529 | 指数退避重试,可选 fallback 模型 |
| 认证失败 | 401/403 | 不重试,直接返回错误 |
| 请求错误 | 400 | 不重试(参数错误重试无意义) |
| 服务端错误 | 500+ | 有限重试(最多 3 次) |
| 网络超时 | — | 有限重试 |
指数退避采用 1s → 2s → 4s → 8s 的间隔,基于 Duration 做乘法。对于 529(Overloaded),额外支持模型切换——如果用户配置了 fallback 模型(例如 claude-sonnet-4-5 过载时切换到 claude-haiku-4-5),后续调用自动使用 fallback。
Provider 扩展性
加入新 Provider 的步骤非常清晰:
- 在
providers/下新增一个模块,实现LlmProvidertrait - 实现对应的 Transformer(如果需要格式转换)
- 在
registry.rs的provider_from_key()中注册 - 在
model_registry.rs中添加该 Provider 支持的模型列表
openai_compat_providers 模块提供了一条捷径:对于使用 OpenAI API 格式的服务(DeepSeek、OpenCode、部分国产模型),只需配置 API base URL 和 API key,无需编写任何 Provider 代码。这些服务共享同一个 OpenAI 兼容的 SSE 解析器和 Request 构建器,仅配置不同。
// 配置示例
"deepseek": {
"provider_id": "deepseek",
"api_base": "https://api.deepseek.com/v1",
"api_key": "sk-...",
"provider_type": "openai_compat"
}
总结
Provider 抽象和 Agent 查询循环构成了 BoxAgnts 工具系统的"动力核心":
- Provider 抽象通过三层解耦(ProviderRequest/Response 统一数据模型 → LlmProvider trait → Transformer 格式转换)解决了 12 种 AI API 的接入问题。新增 Provider 只需实现 trait 并注册,共享的 SSE 解析器和 Request 构建器通过
openai_compat模块进一步降低了接入成本。 - Agent 查询循环通过 SSE 状态机解析、ToolUse 检测、工具派发、结果反馈的闭环,实现了对话与工具执行的交替编排。MaxTokens 自动恢复(最多 3 次)和指数退避重试策略保障了长任务的可靠性。
- 这两层模块的共同特征是依赖倒置——Agent 循环不依赖具体的 AI 厂商,Provider 实现不依赖具体的对话编排逻辑。所有耦合通过 trait 接口解耦。
费用追踪(CostTracker + AtomicF64)和取消机制(CancellationToken)为生产环境提供了必要的运维可观测性和用户控制能力。
参考资源
- BoxAgnts 源代码:github.com/guyoung/box…
- Anthropic Messages API 文档:docs.anthropic.com/en/api/mess…
- OpenAI Chat Completions API:platform.openai.com/docs/api-re…
- Server-Sent Events 规范:html.spec.whatwg.org/multipage/s…
- Codex CLI Agent Loop 设计:openai.com/index/unrol…
- Claude Code 架构分析:blog.promptlayer.com/claude-code…
- tokio-cron-scheduler:docs.rs/tokio-cron-…
来源:互联网
本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。