菜鸟AI - 让提示词生成更简单! 全站导航 全站导航
AI工具安装 新手教程 进阶教程 辅助资源 AI提示词 热点资讯 技术资讯 产业资讯 内容生成 模型技术 AI信息库

已有账号?

首页 > AI资讯新闻 > RAG检索策略流程与模块化实战指南
技术资讯

RAG检索策略流程与模块化实战指南

2026-05-30
阅读 0
热度 0
作者 菜鸟AI编辑部
摘要

摘要

在前几篇内容中,我们逐一探讨了多种高级 RAG 检索策略——每一种都像精密机械中的独立

在前几篇内容中,我们逐一探讨了多种高级 RAG 检索策略——每一种都像精密机械中的独立零件。真正的挑战在于如何灵活地将这些零件组装成一套能够精准解决实际业务问题的 RAG 系统。本文将从系统层面讲解高级 RAG 的模块化设计方法论,并演示如何通过流程化编排,把这些模块串联成一条可落地的检索增强生成管线。

RAG 模块化架构

模块化 RAG 本质上是一种高度可扩展的设计范式,它将整个 RAG 系统抽象为三层结构:模块类型、模块与操作符。模块类型对应 RAG 的核心环节,例如检索、生成、预处理等;每个模块类型下包含若干功能模块,例如检索模块可以是向量检索、关键词检索或混合检索;而每个功能模块内部又可选用不同的操作符,比如向量检索模块中可切换不同的相似度算法。通过这种分层抽象,整个 RAG 系统变成了一组模块和操作符的灵活组合。在构建工作流时,每一步都能选择最适配的模块,每个模块内部再决定采用哪个操作符,从而实现极高的灵活性与可复用性。

RAG 工作流

所谓 RAG 工作流,是指从用户输入查询开始到系统输出最终生成文本的完整链路。这条链路通常涉及多个模块的协同:除了核心的检索器和生成器,还可能包含查询预处理、检索后处理、结果融合等一系列环节。设计 RAG 工作流的核心目标,是让大语言模型在生成答案时充分利用外部知识库中的信息,从而显著提升结果的准确性与相关性。在推理阶段,RAG 工作流大致可归纳为以下几种模式:

  • Sequential(顺序模式):最基础的线性流程,涵盖简单 RAG 以及经过增强的高级 RAG 实现。

  • Conditional(条件模式):根据查询的关键词特征或语义信息,动态选择不同的 RAG 处理分支。

  • Branching(分支模式):包含多个并行处理分支,可细分为预检索和后检索两个阶段的分支结构。

  • Loop(循环模式):包含迭代检索、递归检索和自适应检索等循环结构,适用于需要多轮信息补全的场景。

下图展示的是 Loop 模式下的 RAG 工作流:

下面我们主要以 Sequential 模式为例,逐步演示如何通过模块化流水线构建出可实际运行的高级 RAG 检索功能。

代码实操

LlamaIndex 的查询流水线功能提供了一套与模块化思路高度契合的解决方案。你可以将不同的检索策略定义为独立模块,再按预定顺序组合成一条完整的查询流水线。下面我们从简单到复杂,逐步展开演示。

基础 RAG 流水线

我们先搭建一个最朴素的基础 RAG 流水线,包含三个核心模块:输入模块接收用户查询,检索模块从知识库中查找相关文档,输出模块根据检索结果生成最终回答。

在定义流水线之前,先将测试文档索引入库。本次使用的测试文档依然是维基百科上关于《复仇者联盟》的电影剧情,示例代码如下:

import os
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import (
    Settings,
    SimpleDirectoryReader,
    StorageContext,
    VectorStoreIndex,
    load_index_from_storage,
)
from llama_index.core.node_parser import SentenceSplitter

documents = SimpleDirectoryReader("./data").load_data()
node_parser = SentenceSplitter()
llm = OpenAI(model="gpt-3.5-turbo")
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = llm
Settings.embed_model = embed_model
Settings.node_parser = node_parser

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(documents)
    index.set_index_id("a vengers")
    index.storage_context.persist("./storage")
else:
    store_context = StorageContext.from_defaults(persist_dir="./storage")
    index = load_index_from_storage(
        storage_context=store_context, index_id="a vengers"
    )
  • 先用 SimpleDirectoryReader 读取 ./data 目录下的文档。

  • 接着定义 SentenceSplitter 作为文档分割策略。

  • 然后配置 OpenAI 的 LLM 与 Embedding 模型,并将它们写入全局 Settings

  • 最后将文档索引持久化到本地,便于后续直接加载复用。

接下来是搭建基础 RAG 流水线的核心代码:

from llama_index.core.query_pipeline import QueryPipeline, InputComponent
from llama_index.core.response_synthesizers.simple_summarize import SimpleSummarize

retriever = index.as_retriever()
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "retriever": retriever,
        "output": SimpleSummarize(),
    }
)

p.add_link("input", "retriever")
p.add_link("input", "output", dest_key="query_str")
p.add_link("retriever", "output", dest_key="nodes")
  • 先创建一个普通的检索器 retriever

  • 然后实例化 QueryPipeline 对象,将 verbose 设为 True 以在运行时输出详细调试信息。

  • 通过 add_modules 方法添加三个模块:input 是 InputComponent(查询流水线中最常用的输入组件);retriever 是刚定义的检索器;output 选用 SimpleSummarize,它会将问题与检索结果做一次简单总结后输出。

  • 接着通过 add_link 方法定义模块间的连接关系。第一个参数是源模块,第二个是目标模块。dest_key 参数用于指定目标模块的输入参数名称——因为 output 模块有两个参数(问题和检索结果),所以需要明确告知哪个参数对应何种来源。反之,若源模块有多个输出参数,则需用 src_key 来指定。

除了 add_modules 配合 add_link 的方式,add_chain 方法也能一次性完成模块添加和连接:

p = QueryPipeline(verbose=True)
p.add_chain([InputComponent(), retriever])

这种方式适合单参数模块的快速串联,但遇到多参数模块时,仍需使用 add_modulesadd_link

现在运行这个查询流水线:

question = "Which two members of the A vengers created Ultron?"
output = p.run(input=question)
print(str(output))

# 输出结果如下:
> Running module input with input:
input: Which two members of the A vengers created Ultron?

> Running module retriever with input:
input: Which two members of the A vengers created Ultron?

> Running module output with input:
query_str: Which two members of the A vengers created Ultron?
nodes: [NodeWithScore(node=TextNode(id_='53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0', ...

Bruce Banner and Tony Stark.
  • 通过流水线的 run 方法执行查询,传入问题。

  • 从调试信息可见,流水线依次执行了 input → retriever → output 三个模块,每个模块的输入参数均已打印,最终输出了正确答案。

加入 Reranker 模块

基础 RAG 的效果虽然不错,但检索结果的排序仍有优化空间。接下来我们加入一个 reranker 模块,让它在检索之后对结果进行重新排序。

+from llama_index.postprocessor.cohere_rerank import CohereRerank

+reranker = CohereRerank()
p = QueryPipeline(verbose=True)
p.add_modules(
  {
  "input": InputComponent(),
  "retriever": retriever,
+"reranker": reranker,
  "output": SimpleSummarize(),
  }
)

p.add_link("input", "retriever")
+p.add_link("input", "reranker", dest_key="query_str")
+p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
-p.add_link("retriever", "output", dest_key="nodes")
+p.add_link("reranker", "output", dest_key="nodes")
  • 这里使用的是 Cohere 的 reranker 服务。LlamaIndex 封装了 CohereRerank 类,可直接调用。

  • 使用前需在 Cohere 官网注册并获取 API KEY,然后在环境变量中设置 COHERE_API_KEY

  • 在流水线中新增 reranker 模块,将其插在 retriever 与 output 之间。同时调整连接关系:去掉 retriever 到 output 的直接链路,改为 retriever → reranker → output。

  • reranker 模块同样需要两个输入参数:查询问题与待排序的检索结果,因此 dest_key 也需明确指定。

除了 run 方法,run_with_intermediates 可以获取流水线中各模块的中间结果,便于对比检索前后的变化:

output, intermediates = p.run_with_intermediates(input=question)
retriever_output = intermediates["retriever"].outputs["output"]
print(f"retriever output:")
for node in retriever_output:
    print(f"node id: {node.node_id}, node score: {node.score}")
reranker_output = intermediates["reranker"].outputs["nodes"]
print(f"nreranker output:")
for node in reranker_output:
    print(f"node id: {node.node_id}, node score: {node.score}")

# 输出结果:
retriever output:
node id: 53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0, node score: 0.6608391314791646
node id: dea3844b-789f-46de-a415-df1ef14dda18, node score: 0.5313643379538727

reranker output:
node id: 53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0, node score: 0.9588471
node id: dea3844b-789f-46de-a415-df1ef14dda18, node score: 0.5837967
  • run_with_intermediates 返回一个元组,包含最终输出和所有中间结果。

  • 通过 intermediates 加模块 key 即可获取对应模块的中间输出,例如 intermediates["retriever"] 拿到的是检索器的结果。

  • 每个中间结果都包含 inputsoutputs 两个字典,分别对应模块的输入和输出参数。

从对比数据可明显看出,reranker 模块大幅提升了检索结果的置信度分数,排序也更为合理。

加入 Query Rewrite 模块

刚才的 reranker 属于检索后处理,那能否在检索之前就进行优化?当然可以。接下来我们加入一个 query rewrite 模块,在查询进入检索器之前先做一轮预处理。

+query_rewriter = HydeComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
  {
  "input": InputComponent(),
+"query_rewriter": query_rewriter,
  "retriever": retriever,
  "reranker": reranker,
  "output": SimpleSummarize(),
  }
)

-p.add_link("input", "retriever")
+p.add_link("input", "query_rewriter")
+p.add_link("query_rewriter", "retriever")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
  • 这里定义了一个 HydeComponent 类,实现了 HyDE 查询重写策略。其核心思路是:先根据用户原始查询,让 LLM 生成一个假设性回答(HyDE 中的“假设性文档”),再用这个假设性回答去检索文档。由于假设性回答在语义上更接近目标文档,检索准确率往往显著提升。

  • HydeComponent 是自定义组件,具体实现后面会展开。

  • 在流水线中新增 query_rewriter 模块,放在 input 与 retriever 之间。同时调整连接关系:input → query_rewriter → retriever。因为 query_rewriter 模块只有一个输入参数,所以无需指定 dest_key

LlamaIndex 的查询流水线支持自定义组件,只需继承 CustomQueryComponent 类即可。下面是 HydeComponent 的具体实现:

from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any
from llama_index.core.indices.query.query_transform import HyDEQueryTransform

class HydeComponent(CustomQueryComponent):
    """HyDE query rewrite component."""

    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        assert "input" in input, "input is required"
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        return {"input"}

    @property
    def _output_keys(self) -> set:
        return {"output"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        hyde = HyDEQueryTransform(include_original=True)
        query_bundle = hyde(kwargs["input"])
        return {"output": query_bundle.embedding_strs[0]}
  • _validate_component_inputs 是强制需要实现的方法,用于验证输入参数合法性。

  • _input_keys_output_keys 分别定义组件的输入与输出键名。

  • _run_component 是核心逻辑所在。这里借助 HyDEQueryTransform 完成查询重写,并返回生成的假设性回答文本。

关于查询重写的更多策略,可参考之前的文章。

替换 Output 模块

到目前为止,output 模块一直使用 SimpleSummarize,仅做简单总结。现在我们将其替换为树形总结组件,期望获得更精细的生成质量。

树形总结采用自底向上的递归方式合并文本块并进行总结,类似于从叶子节点向根节点构建一棵树。每一轮递归的具体步骤是:

  1. 重新打包文本块,使每个块都能填满 LLM 的上下文窗口。

  2. 如果只剩下一个块,则直接给出最终响应。

  3. 否则,先分别总结每个块,然后递归地对这些总结再次进行总结。

+from llama_index.core.response_synthesizers.tree_summarize import TreeSummarize

p = QueryPipeline(verbose=True)
p.add_modules(
  {
  "input": InputComponent(),
  "query_rewriter": query_rewriter,
  "retriever": retriever,
  "reranker": reranker,
-"output": SimpleSummarize(),
+"output": TreeSummarize(),
  }
)
  • 替换操作非常直接:只需将 SimpleSummarize 换成 TreeSummarize 即可。

  • 由于 TreeSummarize 的输入输出接口与 SimpleSummarize 保持一致,流水线中其他连接关系无需调整。

查询流水线本质上是一个有向无环图(DAG),每个模块是图中的一个节点,模块间的连接是图中的有向边。我们可以用代码将图结构可视化:

from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(p.clean_dag)
net.write_html("output/pipeline_dag.html")
  • 这里使用 pyvis 库绘制 DAG 图。

  • from_nx 方法将流水线的 DAG 结构转换为网络对象,write_html 则保存为 HTML 文件,方便在浏览器中查看。

生成的流水线图形结构如下:

使用句子窗口检索

还有一个可优化的环节是检索器本身。此前一直使用普通向量检索,现在换用句子窗口检索策略,期望进一步提升检索准确率。

句子窗口检索的核心原理是:在文档切分阶段,以句子为单位切分并生成向量存入数据库。检索时,不仅返回命中句本身,还会将该句前后若干个句子一起打包作为检索结果。包含的句子数量可通过窗口大小参数控制。最终,这组上下文更丰富的文本块会被一并提交给 LLM 来生成答案。

+from llama_index.core.node_parser import SentenceWindowNodeParser

-node_parser = SentenceSplitter()
+node_parser = SentenceWindowNodeParser.from_defaults(
+    window_size=3,
+    window_metadata_key="window",
+    original_text_metadata_key="original_text",
+)

+meta_replacer = MetadataReplacementPostProcessor(target_metadata_key="window")
p = QueryPipeline(verbose=True)
p.add_modules(
  {
  "input": InputComponent(),
  "query_rewriter": query_rewriter,
  "retriever": retriever,
+"meta_replacer": meta_replacer,
  "reranker": reranker,
  "output": TreeSummarize(),
  }
)
p.add_link("input", "query_rewriter")
p.add_link("query_rewriter", "retriever")
+p.add_link("retriever", "meta_replacer")
p.add_link("input", "reranker", dest_key="query_str")
-p.add_link("retriever", "reranker", dest_key="nodes")
+p.add_link("meta_replacer", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
  • 句子窗口检索需要先调整文档的入库方式。此前用的是 SentenceSplitter,现在换为 SentenceWindowNodeParser,窗口大小设为 3,并指定了窗口文本与原始文本对应的元数据键。

  • 句子窗口检索拿到结果后,需要将节点文本替换成窗口文本,因此引入 meta_replacer 模块,其实现类是 MetadataReplacementPostProcessor

  • meta_replacer 放在 retriever 与 reranker 之间。先替换窗口文本,再交给 reranker 重新排序,整体效果会更佳。

我们来对比一下 retrievermeta_replacer 的输出差异:

output, intermediates = p.run_with_intermediates(input=question)
retriever_output = intermediates["retriever"].outputs["output"]
print(f"retriever output:")
for node in retriever_output:
    print(f"node: {node.text}n")
meta_replacer_output = intermediates["meta_replacer"].outputs["nodes"]
print(f"meta_replacer output:")
for node in meta_replacer_output:
    print(f"node: {node.text}n")

# 输出结果:
retriever output:
node: In the Eastern European country of Sokovia, the A vengers—Tony Stark, Thor, Bruce Banner, Steve Rogers, Natasha Romanoff, and Clint Barton—raid a Hydra facility commanded by Baron Wolfgang von Strucker, who has experimented on humans using the scepter previously wielded by Loki.

node: They meet two of Strucker's test subjects—twins Pietro (who has superhuman speed) and Wanda Maximoff (who has telepathic and telekinetic abilities)—and apprehend Strucker, while Stark retrieves Loki's scepter.

meta_replacer output:
node: and attacks the A vengers at their headquarters. Escaping with the scepter, Ultron uses the resources in Strucker's Sokovia base to upgrade his rudimentary body and build an army of robot drones. Ha ving killed Strucker, he recruits the Maximoffs, who hold Stark responsible for their parents' deaths by his company's weapons, and goes to the base of arms dealer Ulysses Klaue in Johannesburg to get vibranium. The A vengers attack Ultron and the Maximoffs, but Wanda subdues them with haunting visions, causing Banner to turn into the Hulk and rampage until Stark stops him with his anti-Hulk armor. [a]
A worldwide backlash over the resulting destruction, and the fears Wanda's hallucinations incited, send the team into hiding at Barton's farmhouse. Thor departs to consult with Dr. Erik Selvig on the apocalyptic future he saw in his hallucination, while Nick Fury arrives and encourages the team to form a plan to stop Ultron.

node: In the Eastern European country of Sokovia, the A vengers—Tony Stark, Thor, Bruce Banner, Steve Rogers, Natasha Romanoff, and Clint Barton—raid a Hydra facility commanded by Baron Wolfgang von Strucker, who has experimented on humans using the scepter previously wielded by Loki. They meet two of Strucker's test subjects—twins Pietro (who has superhuman speed) and Wanda Maximoff (who has telepathic and telekinetic abilities)—and apprehend Strucker, while Stark retrieves Loki's scepter.
Stark and Banner discover an artificial intelligence within the scepter's gem, and secretly decide to use it to complete Stark's "Ultron" global defense program. The unexpectedly sentient Ultron, believing he must eradicate humanity to sa ve Earth, eliminates Stark's A.I.

对比非常明显:retriever 输出的只是孤立的单句,而 meta_replacer 替换后输出的是一段完整的上下文。这种上下文丰富的文本块,能让 LLM 在生成答案时获得更充分的背景信息,回答的准确性自然更上一层楼。

关于句子窗口检索的更多细节,可参考之前的文章。

增加评估模块

最后,我们为流水线添加一个评估模块,用于量化衡量整体效果。这里选用 Ragas 框架实现。

Ragas 是专门用于评估 RAG 应用的框架,提供了丰富且细致的评估指标。

+evaluator = RagasComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
  {
  "input": InputComponent(),
  "query_rewriter": query_rewriter,
  "retriever": retriever,
  "meta_replacer": meta_replacer,
  "reranker": reranker,
  "output": TreeSummarize(),
+"evaluator": evaluator,
  }
)
-p.add_link("input", "query_rewriter")
+p.add_link("input", "query_rewriter", src_key="input")
p.add_link("query_rewriter", "retriever")
p.add_link("retriever", "meta_replacer")
-p.add_link("input", "reranker", dest_key="query_str")
+p.add_link("input", "reranker", src_key="input", dest_key="query_str")
p.add_link("meta_replacer", "reranker", dest_key="nodes")
-p.add_link("input", "output", dest_key="query_str")
+p.add_link("input", "output", src_key="input", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
+p.add_link("input", "evaluator", src_key="input", dest_key="question")
+p.add_link("input", "evaluator", src_key="ground_truth", dest_key="ground_truth")
+p.add_link("reranker", "evaluator", dest_key="nodes")
+p.add_link("output", "evaluator", dest_key="answer")
  • RagasComponent 同样是一个自定义组件,具体实现稍后展开。

  • 将 evaluator 模块放在 output 之后,用于评估最终生成的结果。

  • evaluator 有四个输入参数:问题、真实答案、检索结果和生成的答案。由于 input 模块现在同时提供两个参数(问题 input 和真实答案 ground_truth),因此在添加连接关系时,需要用 src_key 明确指定每个参数的数据来源。

下面是 RagasComponent 的实现:

from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from ragas import evaluate
from datasets import Dataset
from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any

metrics = [faithfulness, answer_relevancy, context_precision, context_recall]

class RagasComponent(CustomQueryComponent):
    """Ragas evalution component."""

    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        return {"question", "nodes", "answer", "ground_truth"}

    @property
    def _output_keys(self) -> set:
        return {"answer", "source_nodes", "evaluation"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        question, ground_truth, nodes, answer = kwargs.values()
        data = {
            "question": [question],
            "contexts": [[n.get_content() for n in nodes]],
            "answer": [str(answer)],
            "ground_truth": [ground_truth],
        }
        dataset = Dataset.from_dict(data)
        evalution = evaluate(dataset, metrics)
        return {"answer": str(answer), "source_nodes": nodes, "evaluation": evalution}
  • 和之前的自定义组件一样,实现了 _validate_component_inputs_input_keys_output_keys_run_component 四个方法。

  • 输入参数包括查询问题、真实答案、检索节点和生成答案;输出参数是生成答案、检索节点和评估结果。

  • _run_component 中,将输入数据封装成 Ragas 要求的 Dataset 格式。

  • 评估指标选择了四个:faithfulness(衡量答案与检索上下文的一致性)、answer_relevancy(衡量答案与问题的相关性)、context_precision(衡量真实答案在检索结果中的排名靠前程度)、context_recall(衡量真实答案在检索结果中的覆盖面)。

最后运行包含评估模块的完整流水线:

question = "Which two members of the A vengers created Ultron?"
ground_truth = "Tony Stark (Iron Man) and Bruce Banner (The Hulk)."
output = p.run(input=question, ground_truth=ground_truth)
print(f"answer: {output['answer']}")
print(f"evaluation: {output['evaluation']}")

# 输出结果:
answer: Tony Stark and Bruce Banner
evaluation: {'faithfulness': 1.0000, 'answer_relevancy': 0.8793, 'context_precision': 1.0000, 'context_recall': 1.0000}
  • 运行流水线时同时传入问题和真实答案。

  • 从输出可见,生成的答案准确无误,评估指标也给出了四个维度的量化分数。其中三个指标达到满分,答案相关性接近 0.88,整体效果相当理想。

总结

通过以上由浅入深的示例,可以清晰看到模块化与流程化在高级 RAG 构建中的实际价值。你可以根据具体需求自定义不同的功能模块,再按业务逻辑组合成完整的查询流水线。更重要的是,在同一个 RAG 应用中,完全可以定义多条不同的流水线来应对不同场景——比如问答用一条链路、对话用另一条链路、推荐再用第三条。这种灵活组合的能力,正是模块化设计最核心的魅力所在。

来源:互联网

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

同类文章推荐

相关文章推荐

更多