采用LangGraph集成多個MCP服務器的應用
原創現代人工智能應用程序通常需要對不同的語言模型和專門的服務器進行復雜的編排,每個服務器在更大的工作流中處理特定的任務。然而,這種分布式方法引入了一個關鍵的挑戰:保持上下文的連續性。
當會話或任務在不同的模型或服務器之間轉換時,上下文信息很容易丟失。用戶體驗到的是,人工智能系統似乎 “忘記” 了對話的早期部分,導致體驗支離破碎,效用降低。
1. MCP 的核心
模型上下文協議(Model Context Protocol, MCP)為在分布式人工智能系統中各組件之間一致地保存和高效傳輸豐富的上下文信息,提供了一個標準化的框架。MCP 的核心設計圍繞以下三大要素展開:
- 它定義了一種統一的上下文容器格式,用于封裝包括會話歷史記錄、用戶偏好設置以及任務相關元數據在內的多種上下文信息。這種結構化的容器使得上下文具備良好的可讀性和可操作性,能夠準確反映交互過程中的狀態與意圖。
- MCP 提供了一套規范的序列化標準,確保上下文能夠在不同的系統、服務或 Agent 之間高效、可靠地傳輸。該標準不僅支持多種數據格式的轉換與解析,還兼顧了性能與兼容性,適用于從本地調用到跨網絡通信的多種場景。
- MCP 還包含一套完整的上下文管理操作,支持上下文的創建、更新、合并與裁剪等關鍵操作。這些操作使得系統可以根據實際需求動態調整上下文內容,在保障信息完整性的同時避免冗余,提升處理效率與資源利用率。
MCP 架構的核心由上下文容器組成:
class ContextContainer:
"""Container for model context information."""
def __init__(self, conversation_id=None):
self.conversation_id = conversation_id or str(uuid.uuid4())
self.messages = []
self.metadata = {}
self.created_at = datetime.now()
self.updated_at = datetime.now()
def add_message(self, role, content, metadata=None):
"""Add a message to the context."""
message = {
"id": str(uuid.uuid4()),
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
self.messages.append(message)
self.updated_at = datetime.now()
def to_dict(self):
"""Serialize the container to a dictionary."""
return {
"conversation_id": self.conversation_id,
"messages": self.messages,
"metadata": self.metadata,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
@classmethod
def from_dict(cls, data):
"""Create a container from a dictionary."""
container = cls(data.get("conversation_id"))
container.messages = data.get("messages", [])
container.metadata = data.get("metadata", {})
container.created_at = datetime.fromisoformat(data.get("created_at"))
container.updated_at = datetime.fromisoformat(data.get("updated_at"))
return container2. LangGraph 集成
LangGraph 已逐漸成為構建復雜人工智能工作流的首選框架,尤其適用于需要多步驟推理、狀態管理以及多 Agent 協作的場景。通過將模型上下文協議(MCP)與 LangGraph 深度集成,開發者能夠構建起結構更復雜、分布更廣泛的多服務系統,同時保持上下文在各組件之間的無縫流轉與一致性。
以下是 MCP 在 LangGraph 環境中的實現方式:
import os
from typing import Dict, List, Any
import requests
from langchain_core.language_models import BaseLLM
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
# MCP Context Manager
class MCPContextManager:
def __init__(self, mcp_server_url):
self.mcp_server_url = mcp_server_url
def create_context(self, initial_data=None):
"""Create a new context container."""
response = requests.post(
f"{self.mcp_server_url}/contexts",
json=initial_data or {}
)
return response.json()
def get_context(self, context_id):
"""Retrieve a context container."""
response = requests.get(
f"{self.mcp_server_url}/contexts/{context_id}"
)
return response.json()
def update_context(self, context_id, data):
"""Update a context container."""
response = requests.patch(
f"{self.mcp_server_url}/contexts/{context_id}",
json=data
)
return response.json()
# MCP-aware LLM Node
class MCPLLMNode:
def __init__(self, llm: BaseLLM, context_manager: MCPContextManager):
self.llm = llm
self.context_manager = context_manager
def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
# Extract the context ID from state
context_id = state.get("context_id")
if not context_id:
# Create new context if none exists
context = self.context_manager.create_context()
context_id = context["conversation_id"]
state["context_id"] = context_id
else:
# Retrieve existing context
context = self.context_manager.get_context(context_id)
# Convert MCP messages to LangChain messages
messages = []
for msg in context.get("messages", []):
if msg["role"] == "user":
messages.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
messages.append(AIMessage(content=msg["content"]))
# Add the current message if present
if "current_input" in state:
messages.append(HumanMessage(content=state["current_input"]))
# Update MCP context with the user message
self.context_manager.update_context(
context_id,
{"add_message": {
"role": "user",
"content": state["current_input"]
}}
)
# Generate response
response = self.llm.generate_response(messages)
# Update MCP context with the assistant response
self.context_manager.update_context(
context_id,
{"add_message": {
"role": "assistant",
"content": response.content
}}
)
# Update state
state["current_output"] = response.content
return state
# Building a multi-server LangGraph with MCP
def build_mcp_graph(llm_servers: List[Dict], mcp_server_url: str):
"""
Build a LangGraph using multiple LLM servers with MCP integration.
llm_servers: List of server configurations with name, url, and routing_criteria
mcp_server_url: URL of the MCP server
"""
context_manager = MCPContextManager(mcp_server_url)
# Create LLM nodes for each server
llm_nodes = {}
for server in llm_servers:
llm = RemoteLLM(server["url"])
llm_nodes[server["name"]] = MCPLLMNode(llm, context_manager)
# Router function to determine which LLM to use
def router(state: Dict[str, Any]) -> str:
input_text = state.get("current_input", "")
for server in llm_servers:
criteria = server.get("routing_criteria", "")
if criteria in input_text:
return server["name"]
# Default to the first server
return llm_servers[0]["name"]
# Build the graph
workflow = StateGraph()
# Add nodes
workflow.add_node("router", router)
for name, node in llm_nodes.items():
workflow.add_node(name, node)
# Add edges
workflow.add_edge("router", dynamic=True)
for name in llm_nodes.keys():
workflow.add_edge(name, END)
# Set the entry point
workflow.set_entry_point("router")
return workflow.compile()這段代碼實現了一個基于 LangGraph 和 MCP(Model Context Protocol,模型上下文協議)的多服務器 AI 工作流系統。它展示了如何將 MCP 與 LangGraph 集成,從而在多個 LLM(語言模型)服務之間協調任務,并保持統一的會話上下文狀態。
3. 創建多服務器部署
現在,讓我們看看如何在現實世界中部署這個架構:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI()
# Configuration for our LLM servers
llm_servers = [
{
"name": "general_llm",
"url": "http://general-llm-server:8000/generate",
"routing_criteria": "" # Default server
},
{
"name": "code_llm",
"url": "http://code-llm-server:8001/generate",
"routing_criteria": "code"
},
{
"name": "creative_llm",
"url": "http://creative-llm-server:8002/generate",
"routing_criteria": "write"
}
]
# MCP server URL
mcp_server_url = "http://mcp-server:9000"
# Build our graph
graph = build_mcp_graph(llm_servers, mcp_server_url)
class ChatRequest(BaseModel):
message: str
context_id: str = None
class ChatResponse(BaseModel):
response: str
context_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# Initialize state
state = {
"current_input": request.message
}
# Add context ID if provided
if request.context_id:
state["context_id"] = request.context_id
# Process through the graph
result = graph.invoke(state)
return ChatResponse(
response=result["current_output"],
context_id=result["context_id"]
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)4. MCP 服務器的實現
這種架構的一個關鍵組件是管理上下文容器的專用 MCP 服務器:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from typing import Dict, List, Optional, Any
import uuid
from datetime import datetime
import json
import redis
app = FastAPI()
# Use Redis for persistence
redis_client = redis.Redis(host="redis", port=6379, db=0)
class Message(BaseModel):
role: str
content: str
metadata: Optional[Dict[str, Any]] = {}
class CreateContextRequest(BaseModel):
metadata: Optional[Dict[str, Any]] = {}
messages: Optional[List[Dict[str, Any]]] = []
class UpdateContextRequest(BaseModel):
metadata: Optional[Dict[str, Any]] = None
add_message: Optional[Dict[str, Any]] = None
replace_messages: Optional[List[Dict[str, Any]]] = None
@app.post("/contexts")
async def create_context(request: CreateContextRequest):
context_id = str(uuid.uuid4())
now = datetime.now().isoformat()
context = {
"conversation_id": context_id,
"messages": request.messages,
"metadata": request.metadata,
"created_at": now,
"updated_at": now
}
redis_client.set(f"context:{context_id}", json.dumps(context))
return context
@app.get("/contexts/{context_id}")
async def get_context(context_id: str):
context_data = redis_client.get(f"context:{context_id}")
if not context_data:
raise HTTPException(status_code=404, detail="Context not found")
return json.loads(context_data)
@app.patch("/contexts/{context_id}")
async def update_context(context_id: str, request: UpdateContextRequest):
context_data = redis_client.get(f"context:{context_id}")
if not context_data:
raise HTTPException(status_code=404, detail="Context not found")
context = json.loads(context_data)
context["updated_at"] = datetime.now().isoformat()
# Update metadata if provided
if request.metadata is not None:
context["metadata"].update(request.metadata)
# Add a message if provided
if request.add_message is not None:
message = {
"id": str(uuid.uuid4()),
"role": request.add_message["role"],
"content": request.add_message["content"],
"timestamp": datetime.now().isoformat(),
"metadata": request.add_message.get("metadata", {})
}
context["messages"].append(message)
# Replace messages if provided
if request.replace_messages is not None:
context["messages"] = request.replace_messages
redis_client.set(f"context:{context_id}", json.dumps(context))
return context
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=9000)5. 為部署撰寫 Docker
為了把一切聯系起來,可以使用 Docker Compose 來部署我們的多服務器系統:
version: '3'
services:
# Main API Gateway
api_gateway:
build: ./api_gateway
ports:
- "8080:8080"
environment:
- MCP_SERVER_URL=http://mcp_server:9000
depends_on:
- mcp_server
- general_llm
- code_llm
- creative_llm
# MCP Server
mcp_server:
build: ./mcp_server
ports:
- "9000:9000"
depends_on:
- redis
# Redis for MCP persistence
redis:
image: redis:alpine
ports:
- "6379:6379"
# LLM Servers
general_llm:
build: ./llm_servers/general
ports:
- "8000:8000"
code_llm:
build: ./llm_servers/code
ports:
- "8001:8000"
creative_llm:
build: ./llm_servers/creative
ports:
- "8002:8000"6. 性能關注
在生產環境中部署模型上下文協議(MCP)時,性能優化是確保系統高效穩定運行的關鍵。以下是幾個需要重點關注的性能考量及應對策略:
上下文大小管理隨著會話持續進行,上下文容器可能會不斷增長,導致存儲開銷和傳輸延遲增加。為避免這一問題,應實施有效的剪枝策略,例如僅保留最近的幾輪對話或通過語義分析提取關鍵信息,從而保持上下文的精簡與相關性。
序列化效率在高吞吐量的系統中,頻繁的上下文序列化與反序列化可能成為性能瓶頸。建議采用更高效的序列化格式,如 Protocol Buffers、MessagePack 或 FlatBuffers,以替代傳統的 JSON,從而減少數據體積并加快編解碼速度,提升整體處理效率。
緩存策略為了降低對后端 MCP 服務的訪問頻率,減少重復查詢帶來的延遲,應在多個層級(如客戶端、網關或服務端)引入緩存機制。可緩存當前活躍的上下文片段,僅在必要時同步更新或拉取最新狀態,從而顯著提升響應速度并減輕系統負載。
通過合理設計這些性能優化策略,可以有效保障 MCP 在大規模、高并發 AI 應用場景下的穩定性與可擴展性,使其更好地服務于復雜的智能工作流需求。
class CachedMCPContextManager(MCPContextManager):
def __init__(self, mcp_server_url, cache_ttl=300):
super().__init__(mcp_server_url)
self.cache = {}
self.cache_ttl = cache_ttl
self.last_access = {}
def get_context(self, context_id):
current_time = time.time()
# Check cache
if context_id in self.cache:
self.last_access[context_id] = current_time
return self.cache[context_id]
# Retrieve from server
context = super().get_context(context_id)
# Update cache
self.cache[context_id] = context
self.last_access[context_id] = current_time
# Prune cache if needed
self._prune_cache()
return context
def _prune_cache(self):
current_time = time.time()
expired_keys = [
k for k, v in self.last_access.items()
if current_time - v > self.cache_ttl
]
for key in expired_keys:
if key in self.cache:
del self.cache[key]
if key in self.last_access:
del self.last_access[key]7. 安全性考量
在實施模型上下文協議(MCP)的過程中,安全性是一個不可忽視的關鍵環節。由于上下文容器可能包含用戶的敏感信息,如對話歷史、個人偏好或業務數據,因此必須采取全面的安全措施來保障系統的可靠性和用戶隱私。
在數據保護方面,應確保上下文數據在存儲和傳輸過程中始終處于加密狀態。同時,建立嚴格的訪問控制機制,僅允許經過身份驗證和授權的實體讀取或修改上下文內容。此外,還需制定合理的數據保留策略,明確上下文信息的存儲時限,并在任務完成后及時清理或歸檔,以降低數據泄露的風險。
身份驗證與授權是保障系統安全的核心環節。所有試圖訪問或更新上下文的請求都必須通過可信的身份認證流程,例如使用 API 密鑰、OAuth 令牌或基于角色的訪問控制(RBAC)。這可以有效防止未經授權的系統或用戶篡改上下文內容,從而保障整個 MCP 系統的數據完整性和操作合法性。
為防范潛在的攻擊行為,還必須重視輸入驗證與內容消毒。任何進入上下文的數據都應經過嚴格校驗,避免惡意構造的內容引發注入攻擊或其他安全漏洞。建議在數據寫入上下文之前進行格式檢查、轉義處理或使用沙箱機制,從源頭上杜絕安全隱患。
通過在數據保護、訪問控制和輸入過濾等方面構建多層次的安全防護體系,可以有效提升 MCP 在實際應用中的安全性,為其在復雜、高要求的生產環境中的部署提供堅實保障。
class SecureMCPContextManager(MCPContextManager):
def __init__(self, mcp_server_url, api_key):
super().__init__(mcp_server_url)
self.api_key = api_key
def _get_headers(self):
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def create_context(self, initial_data=None):
"""Create a new context container with authentication."""
response = requests.post(
f"{self.mcp_server_url}/contexts",
headers=self._get_headers(),
json=initial_data or {}
)
return response.json()
# Similarly update other methods to use authentication小結
MCP的出現,標志著分布式人工智能系統體系架構取得了重大的突破與進展。它提供了一種標準化的方法,用于管理和傳輸不同組件之間的上下文信息,從而為創建能夠保持上下文連續性的復雜多服務器系統提供了有力支持。
在當今時代,人工智能系統正變得愈發復雜。在這樣的背景下,像MCP這樣的協議對于打造無縫的用戶體驗而言,其重要性將日益凸顯。開發人員通過實現本文所闡述的模式和代碼,便能夠構建出健壯的分布式AI系統,進而確保跨服務器和模型的上下文得以有效維護。
MCP的真正優勢在于它的簡潔性與靈活性。它將上下文管理與單個AI組件相分離,這使得更模塊化、易于維護且具備可伸縮性的體系架構成為可能。隨著人工智能生態系統的持續發展,我們有理由期待看到更多類似的標準化協議出現。






















