菜鸟AI - 让提示词生成更简单! 全站导航 全站导航
AI工具安装 新手教程 进阶教程 辅助资源 AI提示词 热点资讯 技术资讯 产业资讯 内容生成 模型技术 AI信息库

已有账号?

首页 > AI创作与模型 > ReAct循环Go实现逐行拆解:50行代码实战教程
模型技术 50行代码实战

ReAct循环Go实现逐行拆解:50行代码实战教程

2026-06-09
阅读 0
热度 0
作者 菜鸟AI编辑部
摘要

摘要

通过加载会话与状态校验、动态注入多租户context、区分新turn和HITL恢复路径,将状态管理、

用 50 行 Go 构建 ReAct 循环:逐段剖析


先说明一个架构上的决策:ReAct 循环本身——LLM 推理、调用工具、观察结果、继续推理的完整闭环——我们并未手动实现,而是交由 Eino 框架的 react.NewAgent() 执行。那么 Handle() 方法只负责处理框架无法覆盖、必须由业务层主导的职责:Session 状态迁移、多租户 context 注入、长期记忆召回、事务持久化,以及异步记忆提炼。

ReAct 循环的 50 行 Go 实现,逐行拆解

下面的代码去掉了冗长的错误处理样板,只保留主干逻辑,约 50 行:

func (h *RunTurnHandler) Handle(ctx context.Context, in RunTurnInput) error {
    // ① 加载会话,校验状态
    sess, _ := h.repo.Load(ctx, in.SessionID)
    if sess.IsTerminal() { return errors.New("session already terminal") }    // ② context 注入
    ctx = context.WithValue(ctx, port.ContextKeyAgentTenantID{}, sess.TenantID())
    ctx = context.WithValue(ctx, port.ContextKeyAgentUserID{}, sess.UserID())
    ctx = context.WithValue(ctx, port.ContextKeyAgentSessionID{}, string(sess.ID()))    // ③ 前置 hook + 加载配置
    h.invokeHook(func(hr port.HookRunner) { hr.BeforeSession(ctx, sess.ID()) })
    cfg, _ := h.configs.Load(ctx, sess.TenantID(), sess.AgentConfig())    // ④ 判断路径:新 turn 还是 HITL 恢复
    isResume := in.UserText == ""
    runInput := port.RunStreamInput{IsResume: isResume}
    var capturedHistory []model.Message    if isResume {
        // ⑤a HITL 恢复:状态迁移 WAITING→RUNNING,先落库再跑
        sess.Resume(*in.Decision)
        h.persist(ctx, sess)
    } else {
        // ⑤b 新 turn:拉历史、写 user 消息、召回记忆、拼 system prompt
        history, _ := h.repo.ListMessages(ctx, sess.ID())
        sess.IncTurn()
        userMsg := model.NewUserMessage(sess.ID(), sess.TurnCount(), in.UserText)
        h.repo.AppendMessage(ctx, userMsg)
        history = append(history, userMsg)
        capturedHistory = history        memCtx := h.recallMemory(ctx, sess.TenantID(), sess.UserID(), in.UserText)
        runInput.SystemContent = expandSystemPrompt(cfg.SystemPrompt, sess) + memCtx
        runInput.History = history
    }    // ⑥ 交给 Eino:ReAct 循环、工具调用、Hook callback 全在里面
    tr, interrupt, err := h.runnableFac.StreamTurn(
        ctx, cfg, sess.TenantID(), runInput, h.hooks, h.stream, sess.ID(),
    )
    if err != nil { return h.failSession(ctx, sess, ...) }    // ⑦ 处理 HITL 中断
    if interrupt != nil { return h.handleEinoInterrupt(ctx, sess, interrupt) }
    defer tr.Close()    // ⑧ 消费流:token → SSE turn.delta + 收集 finalContent
    finalContent, _ := h.consumeEinoStream(ctx, sess.ID(), tr)    // ⑨ 落库 + 触发终态事件 + 异步记忆提炼
    asstMsg := model.NewAssistantMessage(sess.ID(), sess.TurnCount(), finalContent, nil)
    sess.Complete()
    h.commitFinalAnswer(ctx, sess, asstMsg)
    if h.memory != nil && !isResume && len(capturedHistory) > 0 {
        h.scheduleMemoryExtraction(sess, capturedHistory, finalContent)
    }
    return nil
}

接下来,咱们逐段拆解,说明每步的逻辑和背后的设计考量。


① 加载会话,校验状态

sess, err := h.repo.Load(ctx, in.SessionID)
if err != nil {
    return h.onError(ctx, in.SessionID, err)
}
if sess.IsTerminal() {
    return errors.New("session already terminal")
}

一旦 Session 处于终态(COMPLETED / FAILED / CANCELLED),禁止启动新 Turn。这是状态机的一条硬约束。在入口处提前拦截,比深入循环内再做判断要简洁高效得多。


② context 注入

为什么选择 context.WithValue,而不是赋值到 struct 字段?因为 Runnable 是按 AgentConfig 维度缓存的,跨 session 复用。若将 sessionID 绑定到 struct 字段,缓存第一个 session 后后续全部错乱。从 context 动态读取,缓存才能安全共享。

ctx = context.WithValue(ctx, port.ContextKeyAgentTenantID{}, sess.TenantID())
ctx = context.WithValue(ctx, port.ContextKeyAgentUserID{}, sess.UserID())
ctx = context.WithValue(ctx, port.ContextKeyAgentSessionID{}, string(sess.ID()))

这三个值随 context 一路传递到 Eino 的 chatModelAdaptertoolBrokerAdapter,从中取出信息做租户隔离和工具路由。


③ 前置 hook + 加载配置

h.invokeHook(func(hr port.HookRunner) { _ = hr.BeforeSession(ctx, sess.ID()) })cfg, err := h.configs.Load(ctx, sess.TenantID(), sess.AgentConfig())

BeforeSession 是 Hook 链的首环节,典型用途包括认证验证、额度预检、tracing 初始化。invokeHook 内部做了 nil 检查,若无注入 HookRunner 则静默跳过。

AgentConfig 包含系统提示词模板、LLM profile、工具白名单、最大轮次、HITL 开关等信息。每次 Turn 重新加载,确保配置变更即时生效。


④ 判断路径

isResume := in.UserText == ""

一个布尔值区分两条执行分支:若 UserText == "",则属于 HITL 恢复(从 Eino 的 checkpoint 继续);否则是新 Turn。两条路径的准备工作不同,但最终调用同一个 StreamTurn


⑤a HITL 恢复路径

if sess.State() != model.StateWaiting {
    return fmt.Errorf("resume called but session not waiting, state=%s", sess.State())
}
dec := model.InterruptDecision{Action: model.DecisionApprove}
if in.Decision != nil {
    dec = *in.Decision
}
if err := sess.Resume(dec); err != nil {
    _ = h.onError(ctx, sess.ID(), err)
    return err
}
if err := h.persist(ctx, sess); err != nil {
    return err
}

核心原则:“先落库,再运行”。sess.Resume(dec) 将状态从 WAITING 切换到 RUNNING,立即写入数据库。这样即使进程崩溃重启,session 状态已是 RUNNING,重试逻辑能正确处理。若先运行再落库,崩溃时 session 仍为 WAITING,重试会再次走恢复路径,导致重复执行。

isResume=true 时,runInput.HistoryrunInput.SystemContent 保持零值——Eino 遇到 nil inputMsgs 自动从 CheckpointStore 加载上次暂停时的图状态。


⑤b 新 turn 路径

history, _ := h.repo.ListMessages(ctx, sess.ID())
sess.IncTurn()
userMsg := model.NewUserMessage(sess.ID(), sess.TurnCount(), in.UserText)
h.repo.AppendMessage(ctx, userMsg)
history = append(history, userMsg)
capturedHistory = historymemCtx := h.recallMemory(ctx, sess.TenantID(), sess.UserID(), in.UserText)
runInput.SystemContent = expandSystemPrompt(cfg.SystemPrompt, sess) + memCtx
runInput.History = history

依次完成五步:拉取历史消息 → 递增轮次计数 → 写入用户消息 → 召回长期记忆 → 组装系统提示词。

recallMemory 根据本轮用户文本检索相关记忆片段,追加到 system prompt 末尾,检索失败静默返回空字符串。expandSystemPrompt 展开 {{user_id}}{{tenant_id}}{{time}} 占位符。capturedHistory 为会话结束后进行记忆提炼做准备。


⑥ 交给 Eino

tr, interrupt, err := h.runnableFac.StreamTurn(
    ctx, cfg, sess.TenantID(), runInput, h.hooks, h.stream, sess.ID(),
)

这一行是 Handle 方法的分界线——左侧是 DDD 领域逻辑,右侧是 Eino 框架的内部逻辑。为什么切在这里而非更早(让 Eino 接管整个 Handle)或更晚(手写 ReAct 循环)?状态加载、context 注入、记忆召回与 LLM 编排无关,框架不应介入;LLM 调用、工具分发、流式输出已有成熟实现,无需重写。这条边界让两边独立演进——更换 Eino 版本不影响状态机,修改记忆策略不影响 Runnable 缓存。

StreamTurn 内部检查 Runnable 缓存(TTL 35 分钟)→ 未命中时用 react.NewAgent() 构建编译 → 设置 compose 选项(checkpoint ID、callbacks、force-new-run)→ 调用 runnable.Stream() → 检测 interrupt 信号。应用层无需感知这些细节。

它返回的三种情况非常清晰:

返回值含义
(TokenReader, nil, nil)正常流,消费到 Final Answer
(nil, *AgentInterruptInfo, nil)Eino 图级 HITL 中断
(nil, nil, err)执行错误

⑦ 处理 HITL 中断

if interrupt != nil {
    it := model.NewPreToolInterrupt("hitl: tool call requires approval", model.ToolCall{}, h.interruptTTL)
    sess.Pause(it)
    h.stream.Emit(ctx, sess.ID(), port.StreamEvent{
        Type: "interrupt", Payload: map[string]any{"before_nodes": info.BeforeNodes},
    })
    return h.persist(ctx, sess)
}

该中断信号来自 Eino 图级别 WithInterruptBeforeNodes(["tools"]),而非 Hook 返回值。Hook 的 BeforeToolUse 在 Eino 的 Callback 层执行,应用层在这里看到的,是 Eino 已经决定“中断”,因此只需做一件事:将 Session 状态迁移到 WAITING,并推送 SSE 消息通知前端。


⑧ 消费流

var sb strings.Builder
for {
    content, err := tr.Recv()
    if errors.Is(err, io.EOF) { break }
    if err != nil { return sb.String(), err }
    if content != "" {
        sb.WriteString(content)
        h.stream.Emit(ctx, sid, port.StreamEvent{Type: "turn.delta", Payload: content})
    }
}

port.TokenReader 是框架无关的接口,仅定义 Recv() (string, error)。Eino 的 schema.StreamReadereinoadapter 包中被包装成此接口,因此代码中看不到任何 Eino 类型。LLM 每吐出一个 token,SSE 推一帧,无额外 goroutine,也无缓冲区。


⑨ 落库、触发终态、异步记忆提炼

asstMsg := model.NewAssistantMessage(sess.ID(), sess.TurnCount(), finalContent, nil)
sess.Complete()
h.commitFinalAnswer(ctx, sess, asstMsg)if h.memory != nil && !isResume && len(capturedHistory) > 0 {
    h.scheduleMemoryExtraction(sess, capturedHistory, finalContent)
}

这里完成三件事:

commitFinalAnswer:优先走事务路径,消息写入与 session 状态更新在同一数据库事务中完成。写入后,emitTerminalEvents 触发 AfterSession hook 和 SSE 的 done 帧。注意 persist()emitTerminalEvents() 是两个独立函数——persist 只负责写库和发布事件,SSE 的 done 帧由 emitTerminalEvents 单独控制,避免事务路径与非事务路径重复触发。

scheduleMemoryExtraction:启动一个 goroutine,设置 30 秒超时,将本轮 user 和 assistant 消息异步发送给 memory BC 进行记忆提炼。若失败,仅触发 OnError hook,不阻塞响应返回。HITL 恢复路径(isResume=true)跳过此步,避免同一会话重复提炼。


流程图

Handle(in)
    │
    ├─ Load + IsTerminal 检查
    ├─ context 注入(tenantID/userID/sessionID)
    ├─ BeforeSession hook + Load AgentConfig
    │
    ├─── isResume=true ──────────────────────────────────────┐
    │    StateWaiting 校验                                   │
    │    sess.Resume → persist(先落库)                      │
    │    runInput: IsResume=true, History=nil                │
    │                                                        │
    └─── isResume=false ─────────────────────────────────────┤
         ListMessages + IncTurn + AppendMessage              │
         recallMemory + expandSystemPrompt                   │
         runInput: SystemContent + History                   │
                                                            ↓
                                    runnableFac.StreamTurn(...)
                                    ┌── Eino 内部 ──────────────┐
                                    │  react.NewAgent ReAct 循环│
                                    │  工具调用 + Hook callback  │
                                    └──────────────────────────┘
                                            │
                              ┌─────────────┼──────────────┐
                        interrupt        TokenReader      error
                              │              │               │
                        sess.Pause()   consumeEinoStream  failSession
                        SSE interrupt  SSE turn.delta
                        persist        收集 finalContent
                                            │
                                    sess.Complete()
                                    commitFinalAnswer(事务)
                                    emitTerminalEvents → SSE done
                                    scheduleMemoryExtraction(goroutine)

小结

再回头看这 50 行主干,你会发现 Handle() 的核心职责并非执行一个 ReAct 循环——那已经交给 Eino 处理。它真正做的是框架无法覆盖的事情:Session 状态迁移的原子性、多租户 context 透传、记忆召回与异步提炼、事务落库、终态事件解耦。这些东西没有任何框架能替你完成,因为它们与具体业务域强绑定。

因此,更换 Eino 版本不影响你的状态机,修改记忆策略不影响 Runnable 缓存,改动 Hook 逻辑不影响事务落库——这条边界线的价值就在于此。

下一篇,我们将深入 HITL 完整路径:从 Eino 图级中断、人工审批,到 checkpoint 恢复执行,整个过程如何串联。


来源:互联网

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

同类文章推荐

相关文章推荐

更多