ReAct循环Go实现逐行拆解:50行代码实战教程
摘要
通过加载会话与状态校验、动态注入多租户context、区分新turn和HITL恢复路径,将状态管理、
用 50 行 Go 构建 ReAct 循环:逐段剖析
先说明一个架构上的决策:ReAct 循环本身——LLM 推理、调用工具、观察结果、继续推理的完整闭环——我们并未手动实现,而是交由 Eino 框架的 react.NewAgent() 执行。那么 Handle() 方法只负责处理框架无法覆盖、必须由业务层主导的职责:Session 状态迁移、多租户 context 注入、长期记忆召回、事务持久化,以及异步记忆提炼。

下面的代码去掉了冗长的错误处理样板,只保留主干逻辑,约 50 行:
func (h *RunTurnHandler) Handle(ctx context.Context, in RunTurnInput) error {
// ① 加载会话,校验状态
sess, _ := h.repo.Load(ctx, in.SessionID)
if sess.IsTerminal() { return errors.New("session already terminal") } // ② context 注入
ctx = context.WithValue(ctx, port.ContextKeyAgentTenantID{}, sess.TenantID())
ctx = context.WithValue(ctx, port.ContextKeyAgentUserID{}, sess.UserID())
ctx = context.WithValue(ctx, port.ContextKeyAgentSessionID{}, string(sess.ID())) // ③ 前置 hook + 加载配置
h.invokeHook(func(hr port.HookRunner) { hr.BeforeSession(ctx, sess.ID()) })
cfg, _ := h.configs.Load(ctx, sess.TenantID(), sess.AgentConfig()) // ④ 判断路径:新 turn 还是 HITL 恢复
isResume := in.UserText == ""
runInput := port.RunStreamInput{IsResume: isResume}
var capturedHistory []model.Message if isResume {
// ⑤a HITL 恢复:状态迁移 WAITING→RUNNING,先落库再跑
sess.Resume(*in.Decision)
h.persist(ctx, sess)
} else {
// ⑤b 新 turn:拉历史、写 user 消息、召回记忆、拼 system prompt
history, _ := h.repo.ListMessages(ctx, sess.ID())
sess.IncTurn()
userMsg := model.NewUserMessage(sess.ID(), sess.TurnCount(), in.UserText)
h.repo.AppendMessage(ctx, userMsg)
history = append(history, userMsg)
capturedHistory = history memCtx := h.recallMemory(ctx, sess.TenantID(), sess.UserID(), in.UserText)
runInput.SystemContent = expandSystemPrompt(cfg.SystemPrompt, sess) + memCtx
runInput.History = history
} // ⑥ 交给 Eino:ReAct 循环、工具调用、Hook callback 全在里面
tr, interrupt, err := h.runnableFac.StreamTurn(
ctx, cfg, sess.TenantID(), runInput, h.hooks, h.stream, sess.ID(),
)
if err != nil { return h.failSession(ctx, sess, ...) } // ⑦ 处理 HITL 中断
if interrupt != nil { return h.handleEinoInterrupt(ctx, sess, interrupt) }
defer tr.Close() // ⑧ 消费流:token → SSE turn.delta + 收集 finalContent
finalContent, _ := h.consumeEinoStream(ctx, sess.ID(), tr) // ⑨ 落库 + 触发终态事件 + 异步记忆提炼
asstMsg := model.NewAssistantMessage(sess.ID(), sess.TurnCount(), finalContent, nil)
sess.Complete()
h.commitFinalAnswer(ctx, sess, asstMsg)
if h.memory != nil && !isResume && len(capturedHistory) > 0 {
h.scheduleMemoryExtraction(sess, capturedHistory, finalContent)
}
return nil
}
接下来,咱们逐段拆解,说明每步的逻辑和背后的设计考量。
① 加载会话,校验状态
sess, err := h.repo.Load(ctx, in.SessionID)
if err != nil {
return h.onError(ctx, in.SessionID, err)
}
if sess.IsTerminal() {
return errors.New("session already terminal")
}
一旦 Session 处于终态(COMPLETED / FAILED / CANCELLED),禁止启动新 Turn。这是状态机的一条硬约束。在入口处提前拦截,比深入循环内再做判断要简洁高效得多。
② context 注入
为什么选择 context.WithValue,而不是赋值到 struct 字段?因为 Runnable 是按 AgentConfig 维度缓存的,跨 session 复用。若将 sessionID 绑定到 struct 字段,缓存第一个 session 后后续全部错乱。从 context 动态读取,缓存才能安全共享。
ctx = context.WithValue(ctx, port.ContextKeyAgentTenantID{}, sess.TenantID())
ctx = context.WithValue(ctx, port.ContextKeyAgentUserID{}, sess.UserID())
ctx = context.WithValue(ctx, port.ContextKeyAgentSessionID{}, string(sess.ID()))
这三个值随 context 一路传递到 Eino 的 chatModelAdapter 和 toolBrokerAdapter,从中取出信息做租户隔离和工具路由。
③ 前置 hook + 加载配置
h.invokeHook(func(hr port.HookRunner) { _ = hr.BeforeSession(ctx, sess.ID()) })cfg, err := h.configs.Load(ctx, sess.TenantID(), sess.AgentConfig())
BeforeSession 是 Hook 链的首环节,典型用途包括认证验证、额度预检、tracing 初始化。invokeHook 内部做了 nil 检查,若无注入 HookRunner 则静默跳过。
AgentConfig 包含系统提示词模板、LLM profile、工具白名单、最大轮次、HITL 开关等信息。每次 Turn 重新加载,确保配置变更即时生效。
④ 判断路径
isResume := in.UserText == ""
一个布尔值区分两条执行分支:若 UserText == "",则属于 HITL 恢复(从 Eino 的 checkpoint 继续);否则是新 Turn。两条路径的准备工作不同,但最终调用同一个 StreamTurn。
⑤a HITL 恢复路径
if sess.State() != model.StateWaiting {
return fmt.Errorf("resume called but session not waiting, state=%s", sess.State())
}
dec := model.InterruptDecision{Action: model.DecisionApprove}
if in.Decision != nil {
dec = *in.Decision
}
if err := sess.Resume(dec); err != nil {
_ = h.onError(ctx, sess.ID(), err)
return err
}
if err := h.persist(ctx, sess); err != nil {
return err
}
核心原则:“先落库,再运行”。sess.Resume(dec) 将状态从 WAITING 切换到 RUNNING,立即写入数据库。这样即使进程崩溃重启,session 状态已是 RUNNING,重试逻辑能正确处理。若先运行再落库,崩溃时 session 仍为 WAITING,重试会再次走恢复路径,导致重复执行。
当 isResume=true 时,runInput.History 和 runInput.SystemContent 保持零值——Eino 遇到 nil inputMsgs 自动从 CheckpointStore 加载上次暂停时的图状态。
⑤b 新 turn 路径
history, _ := h.repo.ListMessages(ctx, sess.ID())
sess.IncTurn()
userMsg := model.NewUserMessage(sess.ID(), sess.TurnCount(), in.UserText)
h.repo.AppendMessage(ctx, userMsg)
history = append(history, userMsg)
capturedHistory = historymemCtx := h.recallMemory(ctx, sess.TenantID(), sess.UserID(), in.UserText)
runInput.SystemContent = expandSystemPrompt(cfg.SystemPrompt, sess) + memCtx
runInput.History = history
依次完成五步:拉取历史消息 → 递增轮次计数 → 写入用户消息 → 召回长期记忆 → 组装系统提示词。
recallMemory 根据本轮用户文本检索相关记忆片段,追加到 system prompt 末尾,检索失败静默返回空字符串。expandSystemPrompt 展开 {{user_id}}、{{tenant_id}}、{{time}} 占位符。capturedHistory 为会话结束后进行记忆提炼做准备。
⑥ 交给 Eino
tr, interrupt, err := h.runnableFac.StreamTurn(
ctx, cfg, sess.TenantID(), runInput, h.hooks, h.stream, sess.ID(),
)
这一行是 Handle 方法的分界线——左侧是 DDD 领域逻辑,右侧是 Eino 框架的内部逻辑。为什么切在这里而非更早(让 Eino 接管整个 Handle)或更晚(手写 ReAct 循环)?状态加载、context 注入、记忆召回与 LLM 编排无关,框架不应介入;LLM 调用、工具分发、流式输出已有成熟实现,无需重写。这条边界让两边独立演进——更换 Eino 版本不影响状态机,修改记忆策略不影响 Runnable 缓存。
StreamTurn 内部检查 Runnable 缓存(TTL 35 分钟)→ 未命中时用 react.NewAgent() 构建编译 → 设置 compose 选项(checkpoint ID、callbacks、force-new-run)→ 调用 runnable.Stream() → 检测 interrupt 信号。应用层无需感知这些细节。
它返回的三种情况非常清晰:
| 返回值 | 含义 |
|---|---|
(TokenReader, nil, nil) | 正常流,消费到 Final Answer |
(nil, *AgentInterruptInfo, nil) | Eino 图级 HITL 中断 |
(nil, nil, err) | 执行错误 |
⑦ 处理 HITL 中断
if interrupt != nil {
it := model.NewPreToolInterrupt("hitl: tool call requires approval", model.ToolCall{}, h.interruptTTL)
sess.Pause(it)
h.stream.Emit(ctx, sess.ID(), port.StreamEvent{
Type: "interrupt", Payload: map[string]any{"before_nodes": info.BeforeNodes},
})
return h.persist(ctx, sess)
}
该中断信号来自 Eino 图级别 WithInterruptBeforeNodes(["tools"]),而非 Hook 返回值。Hook 的 BeforeToolUse 在 Eino 的 Callback 层执行,应用层在这里看到的,是 Eino 已经决定“中断”,因此只需做一件事:将 Session 状态迁移到 WAITING,并推送 SSE 消息通知前端。
⑧ 消费流
var sb strings.Builder
for {
content, err := tr.Recv()
if errors.Is(err, io.EOF) { break }
if err != nil { return sb.String(), err }
if content != "" {
sb.WriteString(content)
h.stream.Emit(ctx, sid, port.StreamEvent{Type: "turn.delta", Payload: content})
}
}
port.TokenReader 是框架无关的接口,仅定义 Recv() (string, error)。Eino 的 schema.StreamReader 在 einoadapter 包中被包装成此接口,因此代码中看不到任何 Eino 类型。LLM 每吐出一个 token,SSE 推一帧,无额外 goroutine,也无缓冲区。
⑨ 落库、触发终态、异步记忆提炼
asstMsg := model.NewAssistantMessage(sess.ID(), sess.TurnCount(), finalContent, nil)
sess.Complete()
h.commitFinalAnswer(ctx, sess, asstMsg)if h.memory != nil && !isResume && len(capturedHistory) > 0 {
h.scheduleMemoryExtraction(sess, capturedHistory, finalContent)
}
这里完成三件事:
commitFinalAnswer:优先走事务路径,消息写入与 session 状态更新在同一数据库事务中完成。写入后,emitTerminalEvents 触发 AfterSession hook 和 SSE 的 done 帧。注意 persist() 和 emitTerminalEvents() 是两个独立函数——persist 只负责写库和发布事件,SSE 的 done 帧由 emitTerminalEvents 单独控制,避免事务路径与非事务路径重复触发。
scheduleMemoryExtraction:启动一个 goroutine,设置 30 秒超时,将本轮 user 和 assistant 消息异步发送给 memory BC 进行记忆提炼。若失败,仅触发 OnError hook,不阻塞响应返回。HITL 恢复路径(isResume=true)跳过此步,避免同一会话重复提炼。
流程图
Handle(in)
│
├─ Load + IsTerminal 检查
├─ context 注入(tenantID/userID/sessionID)
├─ BeforeSession hook + Load AgentConfig
│
├─── isResume=true ──────────────────────────────────────┐
│ StateWaiting 校验 │
│ sess.Resume → persist(先落库) │
│ runInput: IsResume=true, History=nil │
│ │
└─── isResume=false ─────────────────────────────────────┤
ListMessages + IncTurn + AppendMessage │
recallMemory + expandSystemPrompt │
runInput: SystemContent + History │
↓
runnableFac.StreamTurn(...)
┌── Eino 内部 ──────────────┐
│ react.NewAgent ReAct 循环│
│ 工具调用 + Hook callback │
└──────────────────────────┘
│
┌─────────────┼──────────────┐
interrupt TokenReader error
│ │ │
sess.Pause() consumeEinoStream failSession
SSE interrupt SSE turn.delta
persist 收集 finalContent
│
sess.Complete()
commitFinalAnswer(事务)
emitTerminalEvents → SSE done
scheduleMemoryExtraction(goroutine)
小结
再回头看这 50 行主干,你会发现 Handle() 的核心职责并非执行一个 ReAct 循环——那已经交给 Eino 处理。它真正做的是框架无法覆盖的事情:Session 状态迁移的原子性、多租户 context 透传、记忆召回与异步提炼、事务落库、终态事件解耦。这些东西没有任何框架能替你完成,因为它们与具体业务域强绑定。
因此,更换 Eino 版本不影响你的状态机,修改记忆策略不影响 Runnable 缓存,改动 Hook 逻辑不影响事务落库——这条边界线的价值就在于此。
下一篇,我们将深入 HITL 完整路径:从 Eino 图级中断、人工审批,到 checkpoint 恢复执行,整个过程如何串联。
来源:互联网
本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。