LangGraph + 多Agent系統(tǒng)實(shí)戰(zhàn):上下文工程才是智能協(xié)作的核心基建 原創(chuàng)
隨著大語言模型(LLMs)和多智能體系統(tǒng)(Multi-Agent Systems, MAS)的快速發(fā)展,我們不再滿足于單個(gè)“聰明”的AI,而是追求多個(gè)AI代理(Agent)之間的高效協(xié)作、動(dòng)態(tài)推理與上下文感知。而這一切的核心,正是我們今天要探討的主題 —— Multi-Agent。 Context Engineering(多智能體上下文工程)
什么是 Multi-Agent Context Engineering?
Context Engineering(上下文工程) 是指通過設(shè)計(jì)、管理和優(yōu)化上下文信息,使AI系統(tǒng)能夠更準(zhǔn)確地理解當(dāng)前任務(wù)、環(huán)境與用戶意圖,從而做出更合理、更連貫的響應(yīng)。
而在多智能體系統(tǒng)中,每個(gè)Agent都有自己的目標(biāo)、記憶、工具和推理能力。Multi-Agent Context Engineering 就是:
在多個(gè)Agent之間,動(dòng)態(tài)構(gòu)建、共享、演化和優(yōu)化上下文信息,以支持協(xié)同決策、任務(wù)分解、角色分配與沖突解決。
簡(jiǎn)單來說:讓每個(gè)Agent都知道“現(xiàn)在在干嘛、誰在干、為什么干、怎么干”,并能根據(jù)全局上下文動(dòng)態(tài)調(diào)整自己的行為。
為什么需要 Multi-Agent Context Engineering?
想象一個(gè)由3個(gè)Agent組成的團(tuán)隊(duì):
- Planner Agent:負(fù)責(zé)拆解任務(wù)
- Researcher Agent:負(fù)責(zé)搜索信息
- Writer Agent:負(fù)責(zé)撰寫報(bào)告
如果沒有良好的上下文工程:
- Researcher 不知道 Planner 最關(guān)心哪部分?jǐn)?shù)據(jù)
- Writer 拿到的信息可能是過時(shí)的或與目標(biāo)無關(guān)
- Planner 無法評(píng)估任務(wù)是否真正完成
結(jié)果:效率低下、重復(fù)勞動(dòng)、輸出不一致。
而通過 Context Engineering:
- 每個(gè)Agent的輸入輸出都攜帶結(jié)構(gòu)化上下文
- 上下文隨任務(wù)推進(jìn)動(dòng)態(tài)更新
- Agent 之間共享“情境記憶”與“意圖圖譜”
- 系統(tǒng)具備“情境感知”能力,能自動(dòng)協(xié)調(diào)沖突或填補(bǔ)信息缺口
核心組件與設(shè)計(jì)模式
全局上下文存儲(chǔ)(Global Context Store)
類似一個(gè)“黑板系統(tǒng)”(Blackboard Architecture),所有Agent都可以讀寫一個(gè)共享的上下文空間。這個(gè)空間可以是:
- 一個(gè)結(jié)構(gòu)化的 JSON 對(duì)象
- 一個(gè)向量數(shù)據(jù)庫(用于語義檢索)
- 一個(gè)圖數(shù)據(jù)庫(用于關(guān)系建模)
# state.py
from typing import TypedDict, List, Dict, Optional, Any, Annotated
from langgraph.graph.message import add_messages
from datetime import datetime
class AgentState(TypedDict):
agent_id: str
role: str
last_action: str
last_updated: str
status: str # "idle", "working", "blocked", "done"
output: Optional[Any]
class SharedContext(TypedDict):
# 任務(wù)元信息
session_id: str
task_id: str
task_goal: str
task_constraints: List[str]
current_phase: str
phase_history: List[Dict[str, Any]]
# 共享數(shù)據(jù)區(qū)
shared_memory: Dict[str, Any]
# Agent 狀態(tài)池
agent_states: Dict[str, AgentState]
# 上下文控制
context_version: Annotated[int, lambda x, y: x + 1] # 自動(dòng)遞增版本號(hào)
event_queue: Annotated[List[Dict], add_messages] # 事件隊(duì)列(LangGraph 原生支持)
# 時(shí)間戳
created_at: str
updated_at: strAnnotated[int, lambda x, y: x + 1]:每次狀態(tài)更新自動(dòng)遞增版本號(hào) Annotated[List, add_messages]:利用 LangGraph 內(nèi)置的 add_messages 實(shí)現(xiàn)事件隊(duì)列累加
上下文協(xié)議(Context Protocol)
定義Agent之間交換上下文的格式與語義。例如:
- 使用自定義 Schema 描述上下文字段
- 定義“上下文變更事件”(Context Update Event)的發(fā)布/訂閱機(jī)制
- 支持增量更新(Delta Update)而非全量覆蓋
當(dāng) Researcher 找到新數(shù)據(jù)時(shí),它發(fā)布一個(gè) ?
?context.update??? 事件,附帶 ??data_snippet??? 和 ??confidence_score??,Writer 訂閱該事件并決定是否采納。
[Researcher Agent]
│
▼
[更新 state.event_queue] → LangGraph 內(nèi)部流轉(zhuǎn)(Writer 下一步消費(fèi))
│
▼
[發(fā)布到 Redis "agent_events"] → 外部系統(tǒng)實(shí)時(shí)響應(yīng)
│
├→ [前端儀表盤] 實(shí)時(shí)顯示“研究員已獲取數(shù)據(jù)”
├→ [日志服務(wù)] 寫入結(jié)構(gòu)化日志
└→ [告警服務(wù)] 檢測(cè)異常事件(如 confidence < 0.5)
# agents/researcher.py
from event_bus.redis_adapter import RedisEventBus
# 全局 EventBus 實(shí)例(生產(chǎn)環(huán)境建議用依賴注入或 Singleton)
redis_bus = RedisEventBus()
def researcher_agent(state: SharedContext) -> Dict[str, Any]:
data_snippet = "AI市場(chǎng)規(guī)模2025年預(yù)計(jì)達(dá)$3000億"
confidence_score = 0.92
# 寫入 LangGraph 內(nèi)部 event_queue(用于狀態(tài)機(jī)流轉(zhuǎn))
internal_event = {
"event_id": str(uuid.uuid4()),
"event_type": "context.update",
"source": "researcher",
"target": "writer",
"payload": {
"key": "market_data",
"value": data_snippet,
"confidence": confidence_score
}
}
# 同時(shí)發(fā)布到 Redis(用于外部系統(tǒng)監(jiān)聽)
redis_bus.publish("agent_events", {
"event_type": "researcher.data_fetched",
"source_agent": "researcher",
"payload": {
"data_snippet": data_snippet,
"confidence": confidence_score,
"task_id": state["task_id"]
},
"context_version": state["context_version"]
})
return {
"shared_memory": {**state["shared_memory"], "latest_data": data_snippet},
"event_queue": [internal_event], # 供 LangGraph 內(nèi)部消費(fèi)
"agent_states": { ... }
}
# monitor/realtime_logger.py
import json
from event_bus.redis_adapter import RedisEventBus
def on_agent_event(message):
data = json.loads(message['data'])
event_type = data["event_type"]
if event_type == "researcher.data_fetched":
print(f" 實(shí)時(shí)監(jiān)控: 研究員獲取數(shù)據(jù): {data['payload']['data_snippet']}")
elif event_type == "report.draft_ready":
print(f" 報(bào)告已生成,長度: {data['payload']['length']} 字符")
# 啟動(dòng)監(jiān)聽
bus = RedisEventBus()
bus.subscribe("agent_events", on_agent_event)我們也可以在狀態(tài)更新時(shí)自動(dòng)發(fā)布快照事件。
# checkpoint/event_trigger.py
from langgraph.checkpoint.base import BaseCheckpointSaver
class EventEmittingCheckpointer(BaseCheckpointSaver):
def put(self, config, checkpoint, metadata):
# 先調(diào)用原生保存邏輯
super().put(config, checkpoint, metadata)
# 發(fā)布“狀態(tài)已保存”事件到 EventBus
redis_bus.publish("system_events", {
"event_type": "checkpoint.saved",
"thread_id": config["configurable"]["thread_id"],
"version": checkpoint["channel_versions"]["__root__"],
"timestamp": metadata["write_timestamp"]
})注意:LangGraph 的 ??BaseCheckpointSaver?? 是底層接口,自定義要謹(jǐn)慎。更推薦在節(jié)點(diǎn)返回后手動(dòng)發(fā)布。
角色感知上下文(Role-Aware Context)
不同角色的Agent關(guān)注的上下文維度不同:
- Planner 關(guān)注任務(wù)結(jié)構(gòu)與依賴關(guān)系
- Researcher 關(guān)注查詢意圖與數(shù)據(jù)新鮮度
因此,上下文引擎應(yīng)支持:
- 視圖隔離(每個(gè)Agent看到自己需要的上下文子集)
- 權(quán)限控制(某些上下文字段只讀/可寫)
- 個(gè)性化摘要(自動(dòng)為每個(gè)Agent生成“上下文簡(jiǎn)報(bào)”)
我們?cè)诠?jié)點(diǎn)內(nèi)實(shí)現(xiàn)“視圖生成器”:
def get_role_view(state: SharedContext, role: str) -> Dict[str, Any]:
view_templates = {
"planner": ["task_goal", "current_phase", "agent_states"],
"researcher": ["task_goal", "shared_memory.queries", "task_constraints"],
"writer": ["shared_memory.latest_data", "task_goal"]
}
view = {}
for key in view_templates.get(role, []):
if '.' in key:
parts = key.split('.')
val = state
for part in parts:
val = val.get(part, {}) if isinstance(val, dict) else {}
view[key] = val
else:
view[key] = state.get(key)
return view
# 在 researcher_agent 中調(diào)用
view = get_role_view(state, "researcher")
print(f"[Researcher View] {view}")上下文演化與版本控制
上下文不是靜態(tài)的 —— 它隨時(shí)間推移、任務(wù)推進(jìn)、外部輸入而演化。我們需要:
- 記錄上下文變更歷史(類似 Git)
- 支持“上下文快照”用于回滾或?qū)Ρ?/li>
- 檢測(cè)上下文漂移(Context Drift)并觸發(fā)重校準(zhǔn)
例如:當(dāng)用戶中途修改了報(bào)告目標(biāo),系統(tǒng)應(yīng)自動(dòng)通知所有Agent,并觸發(fā) Planner 重新規(guī)劃。
LangGraph 的 ??Checkpointer?? 是天然解決方案。
# graph_builder.py
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
from .state import SharedContext
from .agents import planner_agent, researcher_agent, writer_agent
# 初始化檢查點(diǎn)(持久化 + 版本歷史)
memory = SqliteSaver.from_conn_string("checkpoints.db")
def build_graph():
workflow = StateGraph(SharedContext)
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "writer")
# 編譯時(shí)綁定檢查點(diǎn)
return workflow.compile(checkpointer=memory)手動(dòng)觸發(fā)快照(當(dāng)關(guān)鍵字段變更時(shí))
# agents/planner.py
def planner_agent(state: SharedContext) -> Dict[str, Any]:
new_phase = "data_collection"
# 當(dāng) phase 改變時(shí),強(qiáng)制創(chuàng)建快照(用于回滾)
if state["current_phase"] != new_phase:
# 在返回值中不直接支持“創(chuàng)建快照”,但可通過外部機(jī)制或自定義 reducer 實(shí)現(xiàn)
# 這里我們通過 event 通知外部系統(tǒng)創(chuàng)建快照
snapshot_event = {
"event_id": str(uuid.uuid4()),
"event_type": "snapshot.request",
"reason": f"phase_changed: {state['current_phase']} → {new_phase}",
"manual_trigger": True
}
return {
"current_phase": new_phase,
"event_queue": [snapshot_event],
# ... 其他字段
}回滾到歷史版本(外部調(diào)用)
# rollback_manager.py
from .graph_builder import app
def rollback_to_version(thread_id: str, target_version: int):
"""回滾到指定版本"""
# 獲取所有歷史狀態(tài)
history = list(app.get_state_history({"configurable": {"thread_id": thread_id}}))
for snapshot in reversed(history):
if snapshot.values["context_version"] == target_version:
# 執(zhí)行回滾
app.update_state(
{"configurable": {"thread_id": thread_id}},
snapshot.values, # 完整狀態(tài)覆蓋
as_node="system_rollback" # 虛擬節(jié)點(diǎn)名
)
print(f"已回滾到版本 {target_version}")
return True
return False檢測(cè)上下文漂移(例如用戶修改目標(biāo))
def detect_context_drift(current_state: SharedContext, baseline: Dict) -> bool:
"""檢測(cè)關(guān)鍵字段是否被外部修改"""
drift_fields = ["task_goal", "task_constraints"]
for field in drift_fields:
if current_state.get(field) != baseline.get(field):
return True
return False
if detect_context_drift(latest_state, original_baseline):
# 觸發(fā) Planner 重新規(guī)劃
app.update_state(
{"configurable": {"thread_id": thread_id}},
{"event_queue": [{
"event_id": "drift_detected",
"event_type": "system.reset",
"payload": {"reason": "task_goal_modified"}
}]},
as_node="planner" # 強(qiáng)制在 planner 上下文中處理
)最佳實(shí)踐
上下文壓縮與摘要
當(dāng)上下文過長時(shí)(如超過 LLM 的上下文窗口),使用:
- Map-Reduce 摘要
- 關(guān)鍵信息提取(Key-Value Extraction)
- 向量化 + 相似度檢索(只注入最相關(guān)的上下文)
上下文反思(Context Reflection)
定期讓一個(gè)“Meta-Agent”或“Critic Agent”審查當(dāng)前上下文:
- 是否偏離原始目標(biāo)?
- 是否存在矛盾信息?
- 是否需要人類介入?
外部上下文注入
從外部系統(tǒng)(數(shù)據(jù)庫、API、用戶反饋)動(dòng)態(tài)注入上下文
context["user_feedback"] = get_latest_feedback()
context["market_data"] = fetch_real_time_data()最近建了langgraph & langgraph 智能體開發(fā)交流群,感興趣的朋友可以點(diǎn)贊關(guān)注后入群交流
本文轉(zhuǎn)載自??AI 博物院?? 作者:longyunfeigu

















