LangGraph的stream_mode到底怎么選?我調了一下午終于搞明白了 原創
最近在重構我們的AI對話系統,從簡單的請求-響應模式升級到實時流式處理。過程中發現LangGraph的stream_mode遠比文檔上寫的復雜,今天把對應的實踐經驗分享出來。
stream_mode到底是什么
簡單說,stream_mode就是控制你在流式處理時能拿到什么數據。簡單理解就是你的Graph在執行時,每完成一個節點都會產生輸出。stream_mode決定你能看到什么:
- 是看到完整的狀態快照?
- 還是只看變化的部分?
- 或者只關心LLM的輸出?
4種模式
values
這是默認的,每次返回完整的狀態。說實話,大部分時候用這個就夠了:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import time
# 定義狀態類型
class GraphState(TypedDict):
"""
Graph的狀態定義
- messages: 存儲對話消息列表
- step_count: 記錄執行步驟數
- result: 最終結果
"""
messages: Annotated[list, operator.add] # 使用operator.add來合并列表
step_count: int
result: str
def step_1(state: GraphState) -> GraphState:
"""
第一個處理步驟:初始化和數據準備
"""
print("?? 執行步驟1: 數據準備階段")
time.sleep(1) # 模擬處理時間
return {
"messages": ["步驟1: 開始數據準備"],
"step_count": state.get("step_count", 0) + 1,
"result": "數據準備完成"
}
def step_2(state: GraphState) -> GraphState:
"""
第二個處理步驟:數據處理
"""
print("?? 執行步驟2: 數據處理階段")
time.sleep(1.5) # 模擬處理時間
return {
"messages": ["步驟2: 正在處理數據"],
"step_count": state.get("step_count", 0) + 1,
"result": "數據處理完成,準備分析"
}
def step_3(state: GraphState) -> GraphState:
"""
第三個處理步驟:數據分析和生成結果
"""
print("?? 執行步驟3: 數據分析階段")
time.sleep(2) # 模擬處理時間
total_messages = len(state.get("messages", []))
return {
"messages": ["步驟3: 分析完成,生成最終結果"],
"step_count": state.get("step_count", 0) + 1,
"result": f"分析完成!總共處理了 {total_messages + 1} 條消息,執行了 {state.get('step_count', 0) + 1} 個步驟"
}
def create_workflow():
"""
創建LangGraph工作流
"""
# 創建狀態圖
workflow = StateGraph(GraphState)
# 添加節點
workflow.add_node("step_1", step_1)
workflow.add_node("step_2", step_2)
workflow.add_node("step_3", step_3)
# 定義邊:step_1 -> step_2 -> step_3 -> END
workflow.set_entry_point("step_1")
workflow.add_edge("step_1", "step_2")
workflow.add_edge("step_2", "step_3")
workflow.add_edge("step_3", END)
# 編譯圖
app = workflow.compile()
return app
def demo_stream_values():
"""
演示 stream_mode="values" 的用法
stream_mode="values" 會返回每個步驟完成后的完整狀態值
這讓我們可以實時監控整個graph的狀態變化
"""
print("=" * 60)
print("?? LangGraph Stream Mode Demo - stream_mode='values'")
print("=" * 60)
# 創建工作流
app = create_workflow()
# 初始狀態
initial_state = {
"messages": [],
"step_count": 0,
"result": ""
}
print("\n?? 開始流式執行,實時顯示每個步驟的狀態變化:")
print("-" * 60)
# 使用stream方法,設置stream_mode="values"
for i, output in enumerate(app.stream(initial_state, stream_mode="values")):
print(f"\n?? 步驟 {i} 完成后的狀態:")
print(f" ?? 消息列表: {output.get('messages', [])}")
print(f" ?? 步驟計數: {output.get('step_count', 0)}")
print(f" ? 當前結果: {output.get('result', '')}")
print("-" * 40)
print("\n? 工作流執行完成!")我一開始就是用的這個,結果發現數據量特別大。比如我們有個處理報表的流程,狀態里存了一個幾千行的DataFrame,每個節點都要傳輸這么大的數據,難怪客戶端卡。
updates
后來改成updates模式,立馬快了很多:
def demo_update_values():
"""
對比不同stream_mode的效果
"""
print("\n" + "=" * 60)
print("?? Stream Mode 對比演示")
print("=" * 60)
app = create_workflow()
initial_state = {"messages": [], "step_count": 0, "result": ""}
# 演示 stream_mode="updates"
print("\n?? stream_mode='updates' - 只顯示每步的更新內容:")
for output in app.stream(initial_state, stream_mode="updates"):
for node_name, updates in output.items():
print(f"{node_name} 更新了: {updates}")這個模式特別適合生產環境。比如你的狀態里有個huge_data字段一直不變,用values模式每次都傳,用updates就只傳真正變化的部分。
不過要注意,你拿到的是增量更新,需要自己維護完整狀態:
# 自己維護狀態
current_state = {}
for chunk in app.stream(input_data, stream_mode="updates"):
for node_name, updates in chunk.items():
current_state.update(updates)
# 現在current_state是最新的完整狀態debug
這個模式我只在開發時用,信息特別詳細:
for chunk in app.stream(input_data, stream_mode="debug"):
print(f"Debug info: {chunk}")會輸出類似這樣的信息:
- 節點開始執行
- 節點執行結束
- 狀態變化
- 錯誤信息
- 執行時間
有一次一個節點莫名其妙執行了兩次,就是用debug模式發現的,原來是我的條件邊寫錯了。
messages
如果你在做聊天機器人,這個模式能省很多事.
from typing import TypedDict, List
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage
class SimpleState(TypedDict):
topic: str
joke: str
# 注意:這里沒有 messages 字段!
model = ChatOpenAI(model="gpt-4o-mini")
def call_model(state: SimpleState):
"""調用 LLM 生成笑話"""
# 這里調用了 LLM
llm_response = model.invoke([
{"role": "user", "content": f"Generate a joke about {state['topic']}"}
])
# 返回的是 joke 字段,不是 messages
return {"joke": llm_response.content}
graph1 = (
StateGraph(SimpleState)
.add_node("call_model", call_model)
.add_edge(START, "call_model")
.compile()
)
# stream_mode="messages" 仍然可以工作!
# 因為它攔截的是 model.invoke() 調用時產生的 tokens
for msg, metadata in graph1.stream({"topic": "cats"}, stream_mode="messages"):
if msg.content:
print(msg.content, end="|")
# 輸出: Why| did| the| cat|...(流式輸出)你可能會覺得State里并沒有messages字段,為什么stream_mode="messages" 仍舊能工作呢?這是因為:
當您使用 stream_mode="messages" 時,LangGraph 做了以下事情:
1. **Hook 機制**:
- LangGraph 在底層使用回調(callbacks)系統
- 當檢測到 stream_mode="messages" 時,它會自動將 LLM 的 invoke
方法切換到 stream 模式
2. **事件監聽**:
- 監聽所有 LangChain 模型的 on_llm_new_token 事件
- 這些事件在 LLM 生成 tokens 時觸發
3. **數據流**:用戶代碼調用 model.invoke() ↓ LangGraph 檢測到 stream_mode="messages" ↓ 自動將 invoke 轉換為 stream 調用 ↓ 捕獲 on_llm_new_token 事件 ↓ 將 tokens 作為 (message_chunk, metadata) 流式返回
4. **獨立于 State**:
- stream_mode="messages" 工作在更底層
- 它不關心 State 的結構
- 只要有 LLM 調用,就能捕獲 tokens
"""
# stream_mode="messages" 會捕獲所有節點中的 LLM 調用
for msg, metadata in graph3.stream(
{"input_text": "AI development"},
stream_mode="messages"
):
if msg.content:
node = metadata.get("langgraph_node", "unknown")
print(f"[{node}] {msg.content[:20]}...")不同模式的區別如下:
print("\n不同 stream_mode 的區別:")
# 1. stream_mode="values" - 返回完整的 State
for chunk in graph1.stream({"topic": "cats"}, stream_mode="values"):
print(f"Values mode - State: {chunk}")
# 輸出: {'topic': 'cats', 'joke': '完整的笑話內容'}
# 2. stream_mode="updates" - 返回 State 的更新
for chunk in graph1.stream({"topic": "dogs"}, stream_mode="updates"):
print(f"Updates mode - Updates: {chunk}")
# 輸出: {'call_model': {'joke': '完整的笑話內容'}}
# 3. stream_mode="messages" - 返回 LLM tokens
for msg, metadata in graph1.stream({"topic": "birds"}, stream_mode="messages"):
if msg.content:
print(f"Messages mode - Token: {msg.content[:10]}...")
# 輸出: 流式的 tokens消息增強的處理類
class EnhancedMessageProcessor:
"""增強的消息處理器"""
def __init__(self, verbose: bool = True, show_tools: bool = True):
self.verbose = verbose
self.show_tools = show_tools
self.message_buffer = []
self.tool_calls_buffer = []
self.current_node = None
self.stats = {
"total_messages": 0,
"ai_messages": 0,
"tool_messages": 0,
"total_tokens": 0,
"tool_calls": 0
}
def process(self, msg: BaseMessage, metadata: dict) -> None:
"""處理單個消息"""
self.stats["total_messages"] += 1
node = metadata.get("langgraph_node", "unknown")
if node != self.current_node:
if self.current_node:
self._flush_buffer()
self.current_node = node
if self.verbose:
print(f"\n?? [{node}]", flush=True)
# 處理不同類型的消息
if isinstance(msg, AIMessageChunk):
self._process_ai_chunk(msg, metadata)
elif isinstance(msg, AIMessage):
self._process_ai_message(msg, metadata)
elif isinstance(msg, ToolMessage):
self._process_tool_message(msg, metadata)
elif isinstance(msg, HumanMessage):
self._process_human_message(msg, metadata)
else:
self._process_other_message(msg, metadata)
def _process_ai_chunk(self, msg: AIMessageChunk, metadata: dict):
"""處理 AI 消息塊"""
self.stats["ai_messages"] += 1
# 處理文本內容
if msg.content:
self.message_buffer.append(msg.content)
if self.verbose:
print(msg.content, end="", flush=True)
self.stats["total_tokens"] += len(msg.content.split())
# 處理工具調用塊
if hasattr(msg, 'tool_call_chunks') and msg.tool_call_chunks:
for chunk in msg.tool_call_chunks:
self.tool_calls_buffer.append(chunk)
if self.verbose and self.show_tools:
if chunk.get('name'):
print(f"\n?? 準備調用: {chunk['name']}", end="")
if chunk.get('args'):
print(f" {chunk['args']}", end="")
# 處理完整的工具調用
if hasattr(msg, 'tool_calls') and msg.tool_calls:
self.stats["tool_calls"] += len(msg.tool_calls)
if self.verbose and self.show_tools:
print(f"\n?? 工具調用檢測到:")
for tc in msg.tool_calls:
print(f" ? {tc['name']}: {tc.get('args', {})}")
def _process_ai_message(self, msg: AIMessage, metadata: dict):
"""處理完整的 AI 消息"""
if msg.content and self.verbose:
print(f"\n? AI完整響應: {msg.content[:100]}...")
if hasattr(msg, 'tool_calls') and msg.tool_calls and self.show_tools:
print(f"\n?? 即將執行工具:")
for tc in msg.tool_calls:
print(f" ? {tc['name']}({tc.get('args', {})})")
def _process_tool_message(self, msg: ToolMessage, metadata: dict):
"""處理工具消息"""
self.stats["tool_messages"] += 1
if self.verbose and self.show_tools:
try:
# 嘗試解析 JSON 結果
result = json.loads(msg.content) if msg.content else {}
print(f"\n?? 工具結果:")
for key, value in result.items():
print(f" ? {key}: {value}")
except:
print(f"\n?? 工具結果: {msg.content}")
def _process_human_message(self, msg: HumanMessage, metadata: dict):
"""處理人類消息"""
if self.verbose:
print(f"\n?? 用戶: {msg.content}")
def _process_other_message(self, msg: BaseMessage, metadata: dict):
"""處理其他類型消息"""
if hasattr(msg, 'content') and msg.content and self.verbose:
print(f"\n?? {type(msg).__name__}: {msg.content}")
def _flush_buffer(self):
"""清空緩沖區"""
if self.message_buffer:
full_message = "".join(self.message_buffer)
self.message_buffer = []
if self.tool_calls_buffer:
self.tool_calls_buffer = []
def get_stats(self) -> dict:
"""獲取統計信息"""
return self.stats實際案例
分享一個真實的優化案例。我們有個數據分析的工作流:
class AnalysisState(TypedDict):
raw_data: pd.DataFrame # 原始數據,很大
processed_data: dict # 處理后的數據
summary: str # 分析總結
step_info: str # 當前步驟信息
# 之前的代碼(慢)
asyncfor chunk in app.astream(initial_state): # 默認values模式
# 每次都傳輸完整的DataFrame
print(f"當前步驟: {chunk.get('step_info')}")
# 客戶端:為啥這么卡?
# 優化后(快)
asyncfor chunk in app.astream(initial_state, stream_mode="updates"):
for node_name, updates in chunk.items():
# 只傳輸變化的部分
if"step_info"in updates:
print(f"當前步驟: {updates['step_info']}")
if"summary"in updates:
print(f"分析結果: {updates['summary']}")效果立竿見影,傳輸的數據量少了90%。
選擇建議
開發調試階段:
- 用debug模式,能看到所有細節
- 出問題時方便定位
生產環境:
- 優先用updates模式,性能最好
- 只有真的需要完整狀態時才用values
聊天應用:
- 直接用messages模式,別自己解析了
性能敏感場景:
- 一定要用updates
- 我們測過,數據量大的時候updates比values快3-5倍
模式組合
最后貼個不同模式組合的例子:
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "book a hotel"}]},
config,
stream_mode=["messages", "updates"],
):
if stream_mode == "messages":
print(chunk)
if isinstance(chunk, tuple) and len(chunk) == 2:
message_chunk, metadata = chunk
if hasattr(message_chunk, 'content') and message_chunk.content:
print(message_chunk.content, end="", flush=True)
# messages.append(message_chunk.content)
elif stream_mode == "updates":
# Check for interrupt signal in updates
if isinstance(chunk, dict) and"__interrupt__"in chunk:
is_interrupted = True
interrupt_info = chunk["__interrupt__"]
print(f"\n\n?? INTERRUPT DETECTED!")
print(f" Info: {interrupt_info}")
# Don't break - let it finish streaming current content
# Also check for tool calls that might trigger interrupts
if isinstance(chunk, dict):
for key, value in chunk.items():
if isinstance(value, dict) and"messages"in value:
for msg in value.get("messages", []):
if hasattr(msg, "tool_calls") and msg.tool_calls:
print(f"\n?? Tool call detected: {msg.tool_calls[0].get('name', 'unknown')}")總結
stream_mode這個參數看起來簡單,但選對了能省很多事:
- 別無腦用默認的values,根據場景選擇
- 生產環境首選updates,真的快很多
- debug只在開發時用
- messages是給聊天應用的特供
?
本文轉載自??AI 博物院???? 作者:longyunfeigu

















