菜鸟AI - 让提示词生成更简单! 全站导航 全站导航
AI工具安装 新手教程 进阶教程 辅助资源 AI提示词 热点资讯 技术资讯 产业资讯 内容生成 模型技术 AI信息库

已有账号?

首页 > AI教程 > LangGraph状态图与状态转换完全指南:从基础到高阶技巧
进阶教程 从基础到高阶

LangGraph状态图与状态转换完全指南:从基础到高阶技巧

2026-06-01
阅读 0
热度 0
作者 菜鸟AI编辑部
摘要

摘要

LangGraph采用BSP模型组织节点执行,状态为节点间传递数据的共享背包。图由StateGraph构造,

State Graph Overview

Defining State

In LangGraph, state functions as a shared container for data exchanged between nodes. Each node receives the current state, returns a partial update (state update), and at the super-step boundary these updates are merged back into the state for downstream nodes to consume.

LangGraph: 状态图与状态转换

The state schema must be defined before constructing the graph. The most fundamental and commonly used approach is TypedDict:

from typing_extensions import TypedDict

class State(TypedDict):
    state_value1: int
    state_value2: str
    # additional fields

LangGraph also supports defining state with Pydantic's BaseModel, but TypedDict is more prevalent in real projects because it natively supports Annotated[type, reducer] field binding (details covered later).

In practice, you rarely define state from scratch; instead, you inherit the built-in MessagesState:

from langgraph.graph import MessagesState

class CleanerState(MessagesState):
    # define additional fields here
    column_decisions: dict[str, Decision]
    schema_info: dict

MessagesState is essentially a TypedDict, but it includes a built-in messages field with its corresponding reducer (add_messages). This reducer automatically accumulates Message objects from humans, agents, and tool calls. Inheriting it gives you message history persistence out of the box.

Graph Construction and Invocation

LangGraph models agent workflows as a state graph, with StateGraph as the starting point in the SDK:

from langgraph.graph import StateGraph, START, END

# 1. Instantiate builder with state schema
builder = StateGraph(State)

# 2. Add nodes (each node is a state -> partial state function)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# 3. Add edges (determine execution flow between nodes)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# 4. Compile
graph = builder.compile()

# Use IPython to visualize topology
# display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

compile() returns a callable object. To execute the graph, pass in the initial state:

messages = [HumanMessage(content="Compute: add 3 and 24, multiply the result by 6, then divide by 3")]
graph.invoke({"messages": messages})

Note that graph.invoke() receives the entire initial state, not just a single question. It triggers the full graph execution, not a single LLM call—this is a fundamental difference between LangGraph and a raw LLM API in terms of invocation paradigm. Even if the graph contains only one LLM node, you must follow this state-driven approach. The more complex the graph, the more control information the initial state carries (e.g., configuration flags, contextual data, external inputs).

BSP Execution Model and Super‑Step

LangGraph uses the BSP (Bulk Synchronous Parallel) model to organize node execution. This model underpins all subsequent discussions on reducers, commutativity, and concurrent safety.

Key points:

1. Nodes Return State Updates

Each node function receives the current state and returns a partial update:

def node_1(state):
    # state["state_value"] is the current value
    return {"state_value_1": 2}  # partial update; only changed keys

LangGraph does not immediately apply the update. Instead, it caches all updates until the super‑step boundary, where it merges them via reducers. This is why reducers exist—state updates always go through reducers, never via direct assignment.

2. Execution Is Divided into Discrete Super‑Steps

Graph execution is not a sequential node‑by‑node progression. It is partitioned into discrete synchronous phases called super‑steps. Within each super‑step:

  • All nodes triggered in that step run logically in parallel.
  • The updates they return are temporarily stored, not merged immediately.
  • After the super‑step ends, all updates are merged into the state via reducers, creating the next checkpoint.
  • Then the next super‑step begins.
3. Execution Order Within a Super‑Step Is Undefined

Nodes triggered concurrently in the same super‑step can finish in any order—this is the starting point for all reducer design discussions.

Consider this structure:

flowchart LR
    A["node 1"]
    B["node 2"]
    C["node 3"]
    D[END]
    A --> B
    A --> C
    B --> D
    C --> D

After node 1 completes, node 2 and node 3 are triggered in the same super‑step. LangGraph cannot guarantee which runs first. It could be node 2 first one time and node 3 first the next.

If both node 2 and node 3 write updates to the same state key, LangGraph must combine the two updates into a single value at the super‑step boundary. The logic that determines “how to combine” is the Reducer.

Reducer

A reducer is a mapping from multiple state values to a single output; it essentially embodies the state‑update logic. The operation can be understood as:

for k in update.keys():
    new_state[k] = reducer(current_state[k], update[k])

where k is a key in the state (assuming the state is a TypedDict).

If no reducer is specified, the default behavior is to replace the old value with the new one:

for k in update.keys():
    new_state[k] = update[k]

Accumulative Reducers

When a new value arrives but the old value still holds useful context, an accumulative approach is needed. Typical scenarios: the LLM or downstream nodes need to see “history”, not just the latest state snapshot.

Common techniques:

  • Monotonic accumulation: Annotated[list, operator.add]
  • Semantic accumulation: add_messages, custom sliding windows (e.g., retain the last 10 records), key‑based deduplication (upsert)

Here’s a native Python example of information accumulation:

from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

Aggregative Reducers

When multiple nodes attempt to write to the same key, we need logic to integrate the results. This kind of reducer is used when nodes contribute different parts of the final output.

Moreover, such reducers must satisfy commutativity:

reducer(reducer(init, update_a), update_b) == reducer(reducer(init, update_b), update_a)

In other words, the merged result must not depend on the order of execution.

Use Case: Parallel Schema Profiling

During EDA initialization, three sub‑agents profile the same data from different dimensions. Each agent writes to a different dimension of column_profile, without touching others’ output.

flowchart TD
    A[START]
    B["Assign profiling tasks"]
    C["dtype_agent
infer column data types"] D["missing_agent
compute missing rate"] E["cardinality_agent
compute unique value count"] F["Merge (via reducer) & END"] A --> B B --> C B --> D B --> E C --> F D --> F E --> F

The three agents belong to the same super‑step and fan‑in to the same state key:

class EDAState(MessagesState):
    column_profile: Annotated[dict, ???]  # pending reducer

The updates from the three agents are:

A = {"age": {"dtype": "int32"}, "city": {"dtype": "string"}}
B = {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
C = {"age": {"n_unique": 87}, "city": {"n_unique": 142}}

Expected merged result:

{
    "age": {"dtype": "int32", "missing_rate": 0.03, "n_unique": 87},
    "city": {"dtype": "string", "missing_rate": 0.00, "n_unique": 142},
}
Incorrect Attempt: Shallow Merge

Intuitively, dictionary merging uses | or {**a, **b}:

def shallow_merge(old: dict, new: dict) -> dict:
    return {**old, **new}

Because LangGraph does not guarantee the merge order of A/B/C, testing two arrival sequences manually:

# Order 1: A → B → C
shallow_merge(shallow_merge(shallow_merge({}, A), B), C)
# {"age": {"n_unique": 87}, "city": {"n_unique": 142}}

# Order 2: C → A → B
shallow_merge(shallow_merge(shallow_merge({}, C), A), B)
# {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}

Two problems emerge:

  • Data loss: {**a, **b} overwrites at the leaf level, replacing entire subtrees.
  • Order‑dependent results: which fields are lost changes with each run—this is a classic Heisenbug. Local tests may pass while production fails intermittently. Order‑dependent loss is more dangerous than stable data loss because unit tests can pass, integration tests may fail sporadically, and debugging costs are high.
Correct Implementation: Deep Merge
def deep_merge(old: dict, new: dict) -> dict:
    result = dict(old)
    for k, v in new.items():
        if isinstance(v, dict) and isinstance(result.get(k), dict):
            result[k] = deep_merge(result[k], v)
        else:
            result[k] = v
    return result

class EDAState(MessagesState):
    column_profile: Annotated[dict, deep_merge]

No matter the arrival order (A→B→C, C→A→B, B→C→A, etc.), the merged result is the expected complete profile; commutativity holds.

Key Insight: Merge Granularity Must Align with Business Responsibility Boundaries

The failure of shallow merge and success of deep merge are not fundamentally about recursion depth. The difference lies in whether the reducer’s merge granularity matches the actual responsibility boundaries of the business logic:

  • All three agents overlap at the top‑level keys ("age", "city").
  • But they do not overlap at the leaf level (dtype / missing_rate / n_unique).

Shallow merge stops recursion at the top level, incorrectly treating the overlap as a conflict and using last‑write‑wins, creating order dependence. Deep merge continues recursion to the leaf level, correctly recognizing that responsibility boundaries are non‑overlapping deeper down, thus merging conflict‑free and satisfying commutativity.

Therefore, the core design principle for aggregative reducers is to align merge granularity with the real responsibility boundaries of the business. If boundaries are correct, concurrency is safe; if boundaries are wrong (overwriting too early when non‑conflicting merge space still exists), commutativity is broken.

Arbitrative Reducers

When multiple nodes produce truly conflicting writes for the same value, business rules determine which one to keep. Typical scenarios:

  • HITL where human decisions override agent decisions.
  • Multi‑model aggregation where confidence selects the action.
  • Time‑series where the latest value wins.

⚠️ Complexity of Arbitrative Reducers

An arbitrative reducer’s function body may contain only a few lines of if-else, but its actual complexity stems from the business logic itself. Here’s a comparison with aggregative reducers:

DimensionAggregativeArbitrative
Decision rule sourceData structureBusiness semantics (must be manually coded)
Reusability across projectsHighNearly zero
Structural requirementsRaw data sufficesMetadata required
Commutativity guaranteeClear boundariesDepends on precision of manual coding
Operated objectsNo constraints, overlap allowedInevitable overlap and conflict

Use Case: HITL Override for Data Cleaning Sub‑Agent

Consider a data cleaning sub‑agent:

flowchart TD
    A[START]
    B["Route by column"]
    C["missing_value_agent"]
    D["type_inference_agent"]
    H["human_review (HITL)"]
    E["Merge (via reducer) & Next step"]
    A --> B
    B --> C
    B --> D
    B -.HITL interrupt.-> H
    C --> E
    D --> E
    H --> E

Business rules:

  • Once a human makes a decision, agents must not override that decision.
  • If multiple agents give different suggestions for the same column, a deterministic rule must determine the winner.

State value design:

class CleanerState(MessagesState):
    column_decisions: Annotated[dict[str, Decision], ]

column_decisions is a dictionary containing processing decisions per column. The arbitration rule also operates per column:

def arbitrate_decisions(old: dict, new: dict) -> dict:
    result = dict(old)
    for col, decision in new.items():
        result[col] = arbitrate_single(result.get(col), decision)
    return result

Below are two possible implementations of arbitrate_single.

Incorrect Attempt: Naive Human‑First

The simplest idea is to return whichever decision comes from a human:

def arbitrate_single(old, new):
    if new.get("source") == "human":
        return new
    if old and old.get("source") == "human":
        return old
    return new

Consider test data:

A = {"source": "agent", "op": "fill_mean"}
B = {"source": "agent", "op": "fill_median"}
C = {"source": "human", "op": "drop"}

Check commutativity:

  • When C is involved, logic is correct: always returns {"source": "human"}.
  • When C is not involved, you get: arbitrate_single(A, B) -> B while arbitrate_single(B, A) -> A — different results, commutative property is broken.

So behavior is non‑deterministic when only agents are present.

Correct Approach: Introduce Confidence to Explicitly Model Business Logic

Update the field structure:

A = {"source": "agent", "op": "fill_mean", "confidence": 0.7}
B = {"source": "agent", "op": "fill_median", "confidence": 0.9}
C = {"source": "human", "op": "drop"}  # human does not need confidence

Update the reducer implementation:

def arbitrate_single(old, new):
    # Rule 1: Human takes precedence
    if new.get("source") == "human":
        return new
    if old and old.get("source") == "human":
        return old

    # Rule 2: Without human intervention, higher confidence wins
    if old is None:
        return new
    if new["confidence"] != old["confidence"]:
        return new if new["confidence"] > old["confidence"] else old

    # Rule 3: When confidence ties, enforce a deterministic preference
    return new if new["op"] < old["op"] else old

Now the result is identical regardless of arrival order—commutativity is guaranteed, and system behavior becomes predictable.

来源:互联网

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

同类文章推荐

相关文章推荐

更多