下一代 RAG 系統實戰:用 LangGraph + Neo4j 打造智能體級 GraphRAG
在這篇文章中,我將介紹一個基于 LangGraph 構建的全面 GraphRAG 多智能體系統,它作為一個智能的食物助手。雖然我選擇了膳食規劃作為演示領域,但這個架構是一個多功能的框架,適用于需要復雜、多維度查詢的結構化知識檢索的眾多行業。
這個系統能處理三種關鍵領域的復雜場景:
? 根據飲食限制發現食譜
? 為特定食譜生成購物清單
? 在超市內映射商店產品的位置
通過結合語義搜索(semantic search)進行模糊匹配和精確的 Cypher 查詢進行結構化數據檢索,這個助手能在 Neo4j 知識圖譜上執行多步驟推理,為復雜的查詢提供語境相關的回答。
Github 倉庫地址: https://github.com/PulsarPioneers/meal-planner-graphrag
1. 引言 — Naive RAG vs Graph RAG
為了這個項目,Naive RAG 方法不夠用,原因如下:
?缺乏結構化關系建模:Naive RAG 從非結構化文本中檢索信息,無法表示和推理實體之間的明確關系。這限制了它處理需要理解信息之間連接的查詢的效果。
?有限的多步推理:它僅在單一層面處理查詢,難以回答需要遍歷多個數據點或結合結構化語境中各種來源信息的復雜問題。
?缺乏可解釋性:由于檢索僅基于文本相似度,很難追蹤答案是如何構建的,也難以提供透明的推理路徑。
因此,我們實現了 Graph RAG 系統來解決這些問題。基于圖的框架具有以下優勢:
?明確的實體關系表示:實體及其連接直接在知識圖譜中建模,使系統能夠理解和利用數據的結構。
?多跳和語境推理:系統可以遍歷圖譜,執行多步驟推理,結合相關節點的信息來回答復雜查詢。
?基于模式的檢索:通過利用圖譜的模式(schema),可以精確地制定查詢,檢索結果與底層數據模型一致。
?提升的可解釋性:每個答案的推理路徑都可以通過圖譜追蹤,提供清晰的解釋和更高的透明度。
這些功能使 Graph RAG 系統成為需要結構化數據、復雜關系和可解釋性的應用的更強大且可靠的解決方案。
項目概覽
Agentic Graph RAG 圖示

GraphRAG 工作流程步驟:
1.查詢分析與路由:用戶的請求首先被分析和分類,系統會根據查詢將其路由到適當的工作流程節點。根據查詢內容,系統可能進入下一步(生成研究計劃)、提示用戶提供更多信息,或者如果請求超出范圍則立即回復。
2.研究計劃生成:系統會根據用戶查詢的復雜性,構建一個詳細的、逐步的研究計劃,列出滿足請求所需的具體行動。
3.研究圖譜執行:針對研究計劃中的每一步,系統會調用一個專門的子圖。通過 LLM 生成 Cypher 查詢,針對 Neo4j 知識圖譜進行檢索。使用語義搜索和結構化圖查詢的混合方法,檢索相關節點和關系,確保結果的廣度和精確度。
4.答案生成:利用檢索到的圖譜數據,系統通過 LLM 綜合生成全面的回答,根據需要整合多個來源的信息。
在創建圖譜時,可以根據需求選擇不同的方法。我為了加快速度,自己用樣本數據構建了圖譜,但也可以使用各種工具。下面我們來看一種使用 LLM 和 LangChain 構建 Neo4j 圖譜的技術。
使用 LLM 構建 Neo4j 圖譜
LLM 模型的選擇會顯著影響輸出的準確性和細微差別。
import os
from langchain_openai import ChatOpenAI
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
llm = ChatOpenAI(temperature=0, model_name="gpt-4o")??LLMGraphTransformer?? 通過 LLM 解析和分類實體及其關系,將文本文檔轉換為結構化的圖文檔。我們可以根據需求靈活定義需要提取的節點和關系類型。
例如,我們可能需要以下節點:
? Recipe
? Foodproduct
以及以下關系:
? CONTAINS
可以通過以下方式指定:
from langchain_experimental.graph_transformers import LLMGraphTransformer
llm_transformer_filtered = LLMGraphTransformer(
llm=llm,
allowed_nodes=["Recipe", "Foodproduct"],
allowed_relatinotallow=["CONTAINS"],
)現在,我們可以傳入示例文本并檢查結果:
from langchain_core.documents import Document
text = """
我最喜歡的烹飪創作是讓人無法抗拒的 Vegan Chocolate Cake Recipe。這個美味的甜點以其濃郁的可可風味和柔軟濕潤的口感而聞名。它完全是素食、無乳制品的,并且由于使用了特殊的無麩質面粉混合物,也是無麩質的。
要制作這個蛋糕,食譜包含以下食品及其相應數量:250克無麩質面粉混合物、80克高品質可可粉、200克砂糖和10克發酵粉。為了豐富口感和確保完美發酵,食譜還包含5克香草精。在液體成分中,需要240毫升杏仁奶和60毫升植物油。
這個食譜可以制作一個巧克力蛋糕,被視為類型為甜點的 Foodproduct。
"""
documents = [Document(page_cnotallow=text)]
graph_documents_filtered = await llm_transformer_filtered.aconvert_to_graph_documents(
documents
)
print(f"Nodes:{graph_documents_filtered[0].nodes}")
print(f"Relationships:{graph_documents_filtered[0].relationships}")輸出結果如下:
Nodes:[Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), Node(id='Gluten-Free Flour Blend', type='Foodproduct', properties={}), Node(id='High-Quality Cocoa Powder', type='Foodproduct', properties={}), Node(id='Granulated Sugar', type='Foodproduct', properties={}), Node(id='Baking Powder', type='Foodproduct', properties={}), Node(id='Vanilla Extract', type='Foodproduct', properties={}), Node(id='Almond Milk', type='Foodproduct', properties={}), Node(id='Vegetable Oil', type='Foodproduct', properties={}), Node(id='Chocolate Cake', type='Foodproduct', properties={})]
Relationships:[Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Gluten-Free Flour Blend', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='High-Quality Cocoa Powder', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Granulated Sugar', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Baking Powder', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Vanilla Extract', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Almond Milk', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Vegetable Oil', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Chocolate Cake', type='Foodproduct', properties={}), type='CONTAINS', properties={})]最后,生成的圖文檔可以存儲到 Neo4j 圖數據庫中,通過 ??Neo4jGraph??? 的 ??add_graph_documents?? 方法初始化:
import os
from langchain_neo4j import Neo4jGraph
os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "password"
graph = Neo4jGraph(refresh_schema=False)
graph.add_graph_documents(graph_documents_filtered)然后,我們可以直接從 Neo4j 控制臺查詢圖譜內容:
MATCH p=(r:Recipe)-[:CONTAINS]->(fp:Foodproduct) RETURN p LIMIT 25;
添加節點嵌入
為了更好地理解和消除用戶輸入的歧義,我們可以在需要時通過語義搜索增強圖譜搜索。下面是一個使用 OpenAI 嵌入的示例。
例如,如果用戶問:“給我一個素食巧克力蛋糕食譜的所有原料”
我們需要找到圖譜中與查詢語義最接近的 Recipe 節點。為此,我們為每個 Recipe 節點存儲一個基于其 ID 計算的嵌入。
以下是如何在 Neo4j 中生成和存儲嵌入:
import openai
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
recipe_id = "Vegan Chocolate Cake Recipe"
recipe_embedding = openai.embeddings.create(model="text-embedding-3-small", input=recipe_id).data[0].embedding
with driver.session() as session:
# 創建嵌入字段
session.run(
"MATCH (r:Recipe {id: $recipe_id}) SET r.embedding = $embedding",
recipe_id=recipe_id,
embedding=recipe_embedding
)
# 創建向量索引
session.run(
"CREATE VECTOR INDEX recipe_index IF NOT EXISTS FOR (r:Recipe) ON (r.embedding) OPTIONS {indexConfig: {`vector.dimensions`: 1536, `vector.similarity_function`: 'cosine'}}"
)之后,我們就可以執行語義搜索:
query = "a chocolate cake recipe that is vegan"
query_embedding = openai.embeddings.create(
model="text-embedding-3-small",
input=query
).data[0].embedding
with driver.session() as session:
result = session.run(
"""
CALL db.index.vector.queryNodes('recipe_index', 1, $embedding)
YIELD node, score
RETURN node.id AS name, score
ORDER BY score DESC
""",
embedding=query_embedding
)
for record in result:
print(record["name"], "=>", record["score"])輸出:
Vegan Chocolate Cake Recipe => 0.9284169673919678這只是一個簡要概述,想了解更多技術細節,請查看 LangChain 文檔,或者探索其他工具,如官方的 Neo4j LLM Knowledge Graph Builder。
正如我所說,我通過迭代引入樣本數據創建了圖譜。你可以在 Github 倉庫中找到我使用的圖譜數據轉儲!
設計工作流程
實現系統包括兩個圖譜:
?研究子圖:負責生成多個 Cypher 查詢,用于從 Neo4j 知識圖譜中檢索相關節點和關系。
?主圖:包含主要工作流程,包括分析用戶查詢、生成完成任務所需的步驟,以及生成最終回答。
主圖結構

LangGraph 圖譜預覽LangGraph 的核心概念之一是狀態(state)。每次圖譜執行都會創建一個狀態,在圖譜節點執行時在節點之間傳遞,每個節點在執行后會用其返回值更新這個內部狀態。
讓我們從構建圖譜狀態開始。為此,我們定義了兩個類:
Router:包含用戶查詢的分類結果,分為“more-info”、“valid”或“general”。
from typing import Literal
from pydantic import BaseModel
class Router(BaseModel):
"""Classify user query."""
logic: str
type: Literal["more-info", "valid", "general"]定義的圖譜狀態包括:
InputState:包含用戶和智能體之間交換的消息列表。
from dataclasses import dataclass
from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
@dataclass(kw_notallow=True)
class InputState:
"""
表示包含消息列表的輸入狀態。
屬性:
messages (list[AnyMessage]):與狀態相關聯的消息列表,通過 add_messages 函數處理。
"""
messages: Annotated[list[AnyMessage], add_messages]AgentState:包含 Router 對用戶查詢的分類、研究計劃中要執行的步驟列表,以及智能體可以參考的檢索到的圖譜知識列表。
from dataclasses import dataclass, field
from typing import Annotated
from utils.utils import update_knowledge
from core.state_graph.states.main_graph.input_state import InputState
from core.state_graph.states.main_graph.router import Router
from core.state_graph.states.step import Step
@dataclass(kw_notallow=True)
class AgentState(InputState):
"""
表示主狀態圖中智能體的狀態。
屬性:
router (Router):智能體的路由邏輯。
steps (list[Step]):智能體執行的步驟序列。
knowledge (list[dict]):智能體累積的知識,通過 update_knowledge 函數更新。
"""
router: Router = field(default_factory=lambda: Router(type="general", logic=""))
steps: list[Step] = field(default_factory=list)
knowledge: Annotated[list[dict], update_knowledge] = field(default_factory=list)步驟 1:分析和路由查詢
??analyze_and_route_query??? 函數返回并更新狀態 ??AgentState??? 的 ??router??? 變量。??route_query?? 函數根據之前的查詢分類決定下一步。
具體來說,這一步會用一個 Router 對象更新狀態,該對象的 ??type?? 變量包含以下值之一:“more-info”、“valid”或“general”。根據這些信息,工作流程將被路由到相應的節點(“create_research_plan”、“ask_for_more_info”或“respond_to_general_query”之一)。
async defanalyze_and_route_query(state: AgentState, *, config: RunnableConfig) -> dict[str, Router]:
"""
分析當前智能體狀態并確定下一步的路由邏輯。
參數:
state (AgentState):智能體的當前狀態,包括消息和上下文。
config (RunnableConfig):運行配置。
返回:
dict[str, Router]:包含更新后的路由對象的字典。
"""
model = init_chat_model(
name="analyze_and_route_query", **app_config["inference_model_params"]
)
messages = [{"role": "system", "content": ROUTER_SYSTEM_PROMPT}] + state.messages
print("---ANALYZE AND ROUTE QUERY---")
print(f"MESSAGES: {state.messages}")
response = cast(
Router, await model.with_structured_output(Router).ainvoke(messages)
)
return {"router": response}
defroute_query(state: AgentState) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
"""
根據當前狀態的路由類型確定智能體的下一步行動。
參數:
state (AgentState):智能體的當前狀態,包括路由類型。
返回:
Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
狀態圖中要執行的下一個節點/行動。
拋出:
ValueError:如果路由類型未知。
"""
_type = state.router.type
if _type == "valid":
return"create_research_plan"
elif _type == "more-info":
return"ask_for_more_info"
elif _type == "general":
return"respond_to_general_query"
else:
raise ValueError(f"Unknown router type {_type}")對問題“推薦一些甜的食譜!”的輸出示例:
{
"logic": "雖然提供了‘甜’的口味信息,但缺少其他強制性約束(飲食要求、用餐時間、食譜復雜性、餐點類型、烹飪時間和熱量含量)。因此,需要更多信息才能推薦食譜。",
"type": "more-info"
}請求被分類為“more-info”,因為它不包含提示中插入的所有強制性約束。
步驟 2:超出范圍/需要更多信息
我們定義了 ??ask_for_more_info??? 和 ??respond_to_general_query??? 函數,它們通過調用 LLM 直接為用戶生成回答:第一個函數在路由器確定需要更多用戶信息時執行,第二個函數則為與主題無關的一般查詢生成回答。在這種情況下,需要將生成的回答連接到消息列表中,更新狀態中的 ??messages?? 變量。
async defask_for_more_info(state: AgentState, *, config: RunnableConfig) -> dict[str, list[BaseMessage]]:
"""
根據當前路由邏輯向用戶請求更多信息。
參數:
state (AgentState):智能體的當前狀態,包括路由邏輯和消息。
config (RunnableConfig):運行配置。
返回:
dict[str, list[BaseMessage]]:包含請求更多信息的新消息的字典。
"""
model = init_chat_model(
name="ask_for_more_info", **app_config["inference_model_params"]
)
system_prompt = MORE_INFO_SYSTEM_PROMPT.format(logic=state.router.logic)
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
asyncdefrespond_to_general_query(state: AgentState, *, config: RunnableConfig) -> dict[str, list[BaseMessage]]:
"""
根據智能體的當前狀態和路由邏輯,為一般用戶查詢生成回答。
參數:
state (AgentState):智能體的當前狀態,包括路由邏輯和消息。
config (RunnableConfig):運行配置。
返回:
dict[str, list[BaseMessage]]:包含生成的回答消息的字典。
"""
model = init_chat_model(
name="respond_to_general_query", **app_config["inference_model_params"]
)
system_prompt = GENERAL_SYSTEM_PROMPT.format(logic=state.router.logic)
print("---RESPONSE GENERATION---")
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}對問題“慕尼黑的天氣如何?”的輸出示例:
{
"logic": "請求是關于慕尼黑當前天氣的,與食譜、購物清單或超市產品位置用例無關。因此被分類為一般問題。",
"type": "general"
}
# ---RESPONSE GENERATION---
“我知道你想了解慕尼黑的天氣,但我只能幫助處理食譜、食譜購物清單和超市中產品的位置。”步驟 3:創建研究計劃
如果查詢分類返回“valid”,用戶的請求與文檔范圍一致,工作流程將到達 ??create_research_plan?? 節點,該節點的函數會為與食物相關的查詢創建一個逐步研究計劃。
???review_research_plan??:檢查并改進研究計劃的質量和相關性。
???reduce_research_plan??:簡化或壓縮計劃步驟,使其更高效。
???create_research_plan??:協調整個過程,生成計劃、壓縮計劃、審查計劃并返回最終步驟。
async defreview_research_plan(plan: Plan) -> Plan:
"""
審查研究計劃以確保其質量和相關性。
參數:
plan (Plan):要審查的研究計劃。
返回:
Plan:審查并可能修改后的研究計劃。
"""
formatted_plan = ""
for i, step inenumerate(plan["steps"]):
formatted_plan += f"{i+1}. ({step['type']}): {step['question']}\n"
model = init_chat_model(
name="create_research_plan", **app_config["inference_model_params"]
)
system_prompt = REVIEW_RESEARCH_PLAN_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_structured_schema, plan=formatted_plan
)
reviewed_plan = cast(
Plan, await model.with_structured_output(Plan).ainvoke(system_prompt)
)
return reviewed_plan
asyncdefreduce_research_plan(plan: Plan) -> Plan:
"""
通過簡化或壓縮步驟來減少研究計劃。
參數:
plan (Plan):要減少的研究計劃。
返回:
Plan:減少后的研究計劃。
"""
formatted_plan = ""
for i, step inenumerate(plan["steps"]):
formatted_plan += f"{i+1}. ({step['type']}): {step['question']}\n"
model = init_chat_model(
name="reduce_research_plan", **app_config["inference_model_params"]
)
system_prompt = REDUCE_RESEARCH_PLAN_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_structured_schema, plan=formatted_plan
)
reduced_plan = cast(
Plan, await model.with_structured_output(Plan).ainvoke(system_prompt)
)
return reduced_plan
asyncdefcreate_research_plan(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""
根據智能體的當前知識和消息創建、減少和審查研究計劃。
參數:
state (AgentState):智能體的當前狀態,包括知識和消息。
config (RunnableConfig):運行配置。
返回:
dict[str, list[str] | str]:包含審查計劃的最終步驟和空知識列表的字典。
"""
formatted_knowledge = "\n".join([item["content"] for item in state.knowledge])
model = init_chat_model(
name="create_research_plan", **app_config["inference_model_params"]
)
system_prompt = RESEARCH_PLAN_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_structured_schema, cnotallow=formatted_knowledge
)
messages = [{"role": "system", "content": system_prompt}] + state.messages
print("---PLAN GENERATION---")
# 生成計劃
plan = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
print("Plan")
for i, step inenumerate(plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")
# 減少計劃
reduced_plan = cast(Plan, await reduce_research_plan(plan=plan))
print("Reduced Plan")
for i, step inenumerate(reduced_plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")
# 審查計劃
reviewed_plan = cast(Plan, await review_research_plan(plan=reduced_plan))
print("Reviewed Plan")
for i, step inenumerate(reviewed_plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")
return {"steps": reviewed_plan["steps"], "knowledge": []}對問題“推薦一些食譜。我是素食者,不知道早餐吃什么。熱量要低于1000卡路里。沒有其他偏好。”的輸出示例:
{
"steps":
[
{"type":"semantic-search","question":"通過在 Diet 節點的 name 屬性中搜索‘Vegetarian’來查找適合素食的食譜。"},
{"type":"semantic-search","question":"通過在 MealMoment 節點的 name 屬性中搜索‘Breakfast’來查找適合早餐的食譜。"},
{"type":"query-search","question":"檢索既是素食又在早餐時段提供的食譜,方法是取步驟1和步驟2結果的交集。過濾這些食譜,確保其包含的原料總熱量低于1000卡路里。使用 CONTAINS 關系計算 FoodProduct 節點的總熱量。限制50個。"}
]
}在這個例子中,用戶的請求需要三個步驟來檢索信息。
步驟 4:進行研究
這個函數從研究計劃中取第一個步驟并用它進行研究。研究過程中,函數調用 ??researcher_graph??? 子圖,返回所有新收集的知識,我們將在下一節探討。最后,我們通過移除剛執行的步驟來更新狀態中的 ??steps?? 變量。
async def conduct_research(state: AgentState) -> dict[str, Any]:
"""
使用研究圖執行研究步驟并更新智能體的知識。
參數:
state (AgentState):智能體的當前狀態,包括步驟和知識。
返回:
dict[str, Any]:包含更新后的知識和剩余步驟的字典。
"""
response = await research_graph.ainvoke(
{"step": state.steps[0], "knowledge": state.knowledge}
)
knowledge = response["knowledge"]
step = state.steps[0]
print(
f"\n{len(knowledge)} pieces of knowledge retrieved in total for the step: {step}."
)
return {"knowledge": knowledge, "steps": state.steps[1:]}步驟 5:構建研究子圖
研究圖示

如上圖所示,圖譜包括:
? 查詢生成和執行步驟,或
? 語義搜索步驟
與主圖一樣,我們繼續定義狀態 ??QueryState???(研究圖中 ??execute_query??? 節點的私有狀態)和 ??ResearcherState??(研究圖的狀態)。
@dataclass(kw_notallow=True)
classQueryState:
"""研究圖中管理研究查詢的狀態類。"""
query: str
classStep(TypedDict):
"""單個研究步驟"""
question: str
type: Literal["semantic_search", "query_search"]
@dataclass(kw_notallow=True)
classResearcherState:
"""研究圖的狀態。"""
step: Step
queries: list[str] = field(default_factory=list)
knowledge: Annotated[list[dict], update_knowledge] = field(default_factory=list)步驟 5.1:語義搜索
這一步驟在 Neo4j 圖數據庫上執行基于向量的語義搜索,根據相似性而非精確匹配來查找相關節點。
它由兩個函數組成:
???semantic_search??:使用 LLM 確定搜索參數并協調語義搜索的執行。
???execute_semantic_search??:使用 OpenAI 嵌入和 Neo4j 的向量索引執行實際的向量相似性搜索。
def execute_semantic_search(node_label: str, attribute_name: str, query: str):
"""在 Neo4j 向量索引上執行語義搜索。
此函數使用 OpenAI 嵌入執行基于向量的相似性搜索,查找與提供的查詢語義相似的 Neo4j 圖數據庫中的節點。它將查詢轉換為嵌入向量,并在相應的向量索引中搜索最相似的節點。
參數:
node_label (str):要搜索的節點類型標簽(例如,‘Recipe’,‘FoodProduct’)。
attribute_name (str):要在節點中搜索的屬性(例如,‘name’,‘description’)。
query (str):查找語義相似內容的搜索查詢。
返回:
list:包含匹配節點的屬性字典列表,按相似性得分排序(從高到低)。
"""
index_name = f"{node_label.lower()}_{attribute_name}_index"
top_k = 1
query_embedding = (
openai.embeddings.create(model=app_config["embedding_model"], input=query)
.data[0]
.embedding
)
nodes = (
f"node.name as name, node.{attribute_name} as {attribute_name}"
if attribute_name != "name"
elsef"node.{{attribute_name}} as name"
)
response = neo4j_graph.query(
f"""
CALL db.index.vector.queryNodes('{index_name}', {top_k}, {query_embedding})
YIELD node, score
RETURN {nodes}
ORDER BY score DESC"""
)
print(
f"Semantic Search Tool invoked with parameters: node_label: '{node_label}', attribute_name: '{attribute_name}', query: '{query}'"
)
print(f"Semantic Search response: {response}")
return response
asyncdefsemantic_search(state: ResearcherState, *, config: RunnableConfig):
"""在研究圖中執行語義搜索以查找相關節點。
此函數分析研究問題以確定最佳搜索參數,并在 Neo4j 圖數據庫上執行語義搜索。它使用 LLM 確定應搜索的節點類型和屬性,然后執行基于向量的相似性搜索,查找可以幫助回答問題的語義相關內容。
參數:
state (ResearcherState):當前研究者狀態,包含研究步驟問題和累積的知識。
config (RunnableConfig):運行配置。
返回:
dict[str, list]:包含語義搜索結果的“knowledge”鍵的字典,格式化為知識項。
"""
classResponse(TypedDict):
node_label: str
attribute_name: str
query: str
model = init_chat_model(
name="semantic_search", **app_config["inference_model_params"]
)
vector_indexes = neo4j_graph.query("SHOW VECTOR INDEXES YIELD name RETURN name;")
print(f"vector_indexes: {vector_indexes}")
system_prompt = SEMANTIC_SEARCH_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_structured_schema,
vector_indexes=str(vector_indexes)
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "human", "content": state.step["question"]},
]
response = cast(
Response, await model.with_structured_output(Response).ainvoke(messages)
)
sem_search_response = execute_semantic_search(
node_label=response["node_label"],
attribute_name=response["attribute_name"],
query=response["query"],
)
search_names = [f"'{record['name']}'"for record in sem_search_response]
joined_search_names = ", ".join(search_names)
knowledge = {
"id": new_uuid(),
"content": f"在 {response['node_label']}.{response['attribute_name']} 上執行語義搜索,查找與‘{response['query']}’相似的值\n結果:{joined_search_names}",
}
return {"knowledge": [knowledge]}對生成步驟的輸出示例:
[
{"type": "semantic_search", "question": "通過在 Diet 節點的 name 屬性中搜索‘Vegetarian’來查找適合素食的食譜。"},
{"type": "semantic_search", "question": "通過在 MealMoment 節點的 name 屬性中搜索‘Breakfast’來查找適合早餐的食譜。"}
]
# -- 新知識 --
Semantic Search Tool invoked with parameters: node_label: 'Diet', attribute_name: 'name', query: 'Vegetarian'
Semantic Search response: [{'name': 'Vegetarian'}]
Semantic Search Tool invoked with parameters: node_label: 'MealMoment', attribute_name: 'name', query: 'Breakfast'
Semantic Search response: [{'name': 'Breakfast'}]步驟 5.2:生成查詢
這一步驟根據研究計劃中的問題(一個步驟)生成搜索查詢。此函數使用 LLM 生成多樣化的 Cypher 查詢來幫助回答問題。它由三個函數組成:
???generate_queries??:主函數,生成初始查詢并應用兩種校正方法。
???correct_query_by_llm??:使用具有模式感知的語言模型校正 Cypher 查詢。
???correct_query_by_parser??:使用基于解析器的查詢校正器進行結構校正。
async defcorrect_query_by_llm(query: str) -> str:
"""使用語言模型校正 Cypher 查詢。
此函數使用 LLM 根據 Neo4j 圖譜模式審查和校正 Cypher 查詢。它提供模式感知校正,確保查詢格式正確并使用有效的關系和節點。
參數:
query (str):要校正的 Cypher 查詢。
返回:
str:校正后的 Cypher 查詢。
"""
model = init_chat_model(
name="correct_query_by_llm", **app_config["inference_model_params"]
)
system_prompt = FIX_QUERY_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_structured_schema
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "human", "content": query},
]
response = await model.ainvoke(messages)
return response.content
defcorrect_query_by_parser(query: str) -> str:
"""使用基于解析器的校正器校正 Cypher 查詢。
此函數使用 CypherQueryCorrector 基于圖譜模式解析和校正 Cypher 查詢。它從文本中提取 Cypher 查詢并應用結構校正。
參數:
query (str):包含要校正的 Cypher 查詢的文本。
返回:
str:校正后的 Cypher 查詢。
"""
corrector_schema = [
Schema(el["start"], el["type"], el["end"])
for el in neo4j_graph.get_structured_schema.get("relationships", [])
]
cypher_query_corrector = CypherQueryCorrector(corrector_schema)
extracted_query = extract_cypher(text=query)
corrected_query = cypher_query_corrector(extracted_query)
return corrected_query
asyncdefgenerate_queries(
state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""為研究步驟生成和校正 Cypher 查詢。
此函數根據研究問題和現有知識上下文生成多個 Cypher 查詢。它使用 LLM 生成初始查詢,然后應用基于 LLM 和基于解析器的校正,確保查詢對 Neo4j 圖數據庫有效且格式正確。
參數:
state (ResearcherState):當前研究者狀態,包含研究步驟問題和累積的知識。
config (RunnableConfig):運行配置。
返回:
dict[str, list[str]]:包含校正后 Cypher 查詢列表的“queries”鍵的字典。
"""
classResponse(TypedDict):
queries: list[str]
print("---GENERATE QUERIES---")
formatted_knowledge = "\n\n".join(
[f"{i+1}. {item['content']}"for i, item inenumerate(state.knowledge)]
)
model = init_chat_model(
name="generate_queries", **app_config["inference_model_params"]
)
system_prompt = GENERATE_QUERIES_SYSTEM_PROMPT.format(
schema=neo4j_graph.get_schema, cnotallow=formatted_knowledge
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "human", "content": state.step["question"]},
]
response = cast(
Response, await model.with_structured_output(Response).ainvoke(messages)
)
response["queries"] = [
await correct_query_by_llm(query=q) for q in response["queries"]
]
response["queries"] = [
correct_query_by_parser(query=q) for q in response["queries"]
]
print(f"Queries: {response['queries']}")
return {"queries": response["queries"]}對問題(在語義搜索查詢執行后)的輸出示例:
“推薦一些食譜。我是素食者,不知道早餐吃什么。熱量要低于1000卡路里。沒有其他偏好。”
MATCH (r:Recipe)-[:FITS_DIET]->(:Diet {name: 'Vegetarian'}),
(r)-[:SERVED_DURING]->(:MealMoment {name: 'Breakfast'}),
(r)-[c:CONTAINS]->(fp:FoodProduct)
WITH r, SUM(c.grams * (fp.calories / 100.0)) AS total_calories
WHERE total_calories < 1000
RETURN r.name AS recipe_name, total_calories
LIMIT 5執行后的輸出:
# -- 新知識 --
╒════════════════════════════╤══════════════════╕
│recipe_name │total_calories
╞════════════════════════════╪══════════════════╡
│"Mascarpone Dessert" │945.8000000000001
├────────────────────────────┼──────────────────┤
│"Buffalo Mozzarella Salad" │668.88
├────────────────────────────┼──────────────────┤
│"Raisin and Almond Snack" │374.69999999999993
├────────────────────────────┼──────────────────┤
│"Mozzarella and Basil Salad"│528.4
└────────────────────────────┴──────────────────┘步驟 5.3:構建子圖
def build_research_graph():
builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(execute_query)
builder.add_node(semantic_search)
builder.add_conditional_edges(
START,
route_step,
{"generate_queries": "generate_queries", "semantic_search": "semantic_search"},
)
builder.add_conditional_edges(
"generate_queries",
query_in_parallel, # type: ignore
path_map=["execute_query"],
)
builder.add_edge("execute_query", END)
builder.add_edge("semantic_search", END)
return builder.compile()
research_graph = build_research_graph()步驟 6:檢查完成
使用條件邊(conditional_edge),我們構建了一個循環,其結束條件由 ??check_finished??? 函數的返回值決定。此函數檢查由 ??create_research_plan??? 節點創建的步驟列表中是否還有步驟需要處理。一旦所有步驟完成,流程將進入 ??respond?? 節點。
def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""
根據已執行的步驟確定智能體是應該回答還是繼續研究。
參數:
state (AgentState):智能體的當前狀態,包括已執行的步驟。
返回:
Literal["respond", "conduct_research"]:
如果還有步驟,則為“conduct_research”,否則為“respond”。
"""
if len(state.steps or []) > 0:
return "conduct_research"
else:
return "respond"步驟 7:回答
根據進行的研究生成對用戶查詢的最終回答。此函數使用對話歷史和研究者智能體檢索的文檔,制定全面的回答。
async def respond(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""
根據智能體的累積知識和消息為用戶生成最終回答。
參數:
state (AgentState):智能體的當前狀態,包括知識和消息。
config (RunnableConfig):運行配置。
返回:
dict[str, list[BaseMessage]]:包含生成的回答消息的字典。
"""
print("--- RESPONSE GENERATION STEP ---")
model = init_chat_model(name="respond", **app_config["inference_model_params"])
formatted_knowledge = "\n\n".join([item["content"] for item in state.knowledge])
prompt = RESPONSE_SYSTEM_PROMPT.format(cnotallow=formatted_knowledge)
messages = [{"role": "system", "content": prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}步驟 8:構建主圖
def build_main_graph():
builder = StateGraph(AgentState, input=InputState)
builder.add_node(analyze_and_route_query)
builder.add_node(ask_for_more_info)
builder.add_node(respond_to_general_query)
builder.add_node(create_research_plan)
builder.add_node(conduct_research)
builder.add_node("respond", respond)
builder.add_edge("create_research_plan", "conduct_research")
builder.add_edge(START, "analyze_and_route_query")
builder.add_conditional_edges("analyze_and_route_query", route_query)
builder.add_conditional_edges("conduct_research", check_finished)
builder.add_edge("respond", END)
return builder.compile()結果
我們可以通過以下問題測試其性能:
“給我‘pasta alla carbonara’食譜的購物清單。”

控制臺日志從控制臺日志中可以看到,主圖創建了以下審查計劃:
1.semantic_search:通過在 Recipe 節點的‘name’屬性上進行語義搜索,查找名稱類似于‘pasta alla carbonara’的 Recipe 節點。
2.query_search:檢索通過步驟1識別的 Recipe 節點,通過‘CONTAINS’和‘IS_INSTANCE_OF’關系連接的 StoreProduct 節點,并列出其詳細信息,如名稱、品牌、價格和數量,形成購物清單。限制50個。
執行第一步后,我們得知與‘pasta alla carbonara’對應的 Recipe 節點的準確名稱是‘Classic Carbonara’。
app_service-1 | Semantic Search Tool invoked with parameters: node_label: 'Recipe', attribute_name: 'name', query: 'pasta alla carbonara'
app_service-1 | Semantic Search response: [{'name': 'Classic Carbonara'}]然后執行第二步,使用以下 Cypher 查詢:
MATCH (r:Recipe {name: 'Classic Carbonara'})-[:CONTAINS]->(fp:FoodProduct)<-[:IS_INSTANCE_OF]-(sp:StoreProduct)
RETURN sp.name, sp.brand, sp.price, sp.quantity, sp.quantity_unit
LIMIT 50然后我們得到最終回答。

實時演示 — 使用 Chainlit 制作的 UI
通過檢查圖譜內容,我們看到完整的結果是正確的。

結論
Graph RAG:技術挑戰與考慮
盡管性能有所提升,實施 Graph RAG 并非沒有挑戰:
?延遲:智能體交互的復雜性增加通常會導致響應時間更長。在速度和準確性之間找到平衡是一個關鍵挑戰。
?評估與可觀察性:隨著 Agentic RAG 系統變得更加復雜,持續的評估和可觀察性變得必要。
總之,Graph RAG 在 AI 領域標志著重大突破。通過將大語言模型的能力與自主推理和信息檢索相結合,Graph RAG 引入了新的智能和靈活性標準。隨著 AI 的持續發展,Graph RAG 將在各行各業中扮演重要角色,改變我們使用技術的方式。
本文轉載自??AI大模型觀察站??,作者:AI研究生

















