使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人
在這個項目中,我構建了一個由 AI 驅動的聊天機器人,它可以將自然語言問題轉換為 SQL 查詢,并直接從真實的 SQLite 數據庫中檢索答案。借助 LangChain、Hugging Face Embeddings 和 Chroma 向量存儲,這個應用展示了如何通過 Retrieval-Augmented Generation(RAG,檢索增強生成)工作流,把非結構化的用戶輸入與結構化數據連接起來——配有 FastAPI 后端與 Streamlit 前端界面。
引言:為什么是 Text-to-SQL?
想象一下: 你在會議上,經理突然問道:
“我們能看到上個月加入的所有客戶嗎?”
你看了一眼 SQL 編輯器……發現你得現寫查詢、確認表名,甚至可能還要調試一個缺失的 JOIN。與此同時,另一個人直接問了 AI 聊天機器人同樣的問題——立刻拿到整齊格式化的結果。
這就是 Text-to-SQL 的魔力——把自然語言變成數據庫查詢。
問題所在
SQL(Structured Query Language)是數據分析與數據工程的基石。它強大、靈活、精確——但對很多人來說并不“友好”。 大多數業務用戶、分析師,甚至一些開發者都會覺得 SQL 語法令人生畏,或者在快速獲取洞察時效率不高。
在多數團隊里,這會造成一個鴻溝:
- 數據是可用的,但不容易“開口就問”。
- 洞察是存在的,但被 SQL 專業技能“上鎖”。
解決方案:Text-to-SQL
Text-to-SQL 用來彌合這個鴻溝。 它讓任何人——無論技術背景如何——都能直接發問:
“上個季度我們最暢銷的 5 款產品是什么?”
并直接從數據庫拿到答案,AI 會在背后完成所有翻譯工作。
在幕后,一個 Text-to-SQL 系統通常要做四件關鍵的事:
- 檢索相關的 schema 與上下文(知道有哪些表)。
- 從自然語言問題生成有效的 SQL 查詢。
- 安全地校驗和執行 SQL。
- 以易讀的格式返回結果。
理解 RAG 方法
我們已經看到,Text-to-SQL 讓用戶能用自然語言提問并獲取數據庫結果——但 AI 到底是如何“知道”你的表、列和關系的呢?
答案是:RAG(Retrieval-Augmented Generation,檢索增強生成)。
什么是 RAG?
本質上,RAG 是一種結合“檢索(Retrieval)”與“生成(Generation)”的混合式 AI 方法:
- 檢索——系統抓取相關信息(這里指數據庫 schema、表名、表間關系等)。
- 生成——LLM 根據檢索到的上下文生成準確且有根據的輸出(即 SQL 查詢)。
你可以把 RAG 理解為:在 LLM 開始寫 SQL 之前,先給它一張“備考小抄”——它需要看到的確切 schema。
為什么不直接用普通的 LLM?
如果你直接問一個大語言模型(LLM),比如 GPT 或 Claude:
“列出上個月加入的所有客戶。”
它可能會生成這樣的 SQL:
SELECT * FROM users WHERE signup_date >= '2025-09-01';看起來沒問題——直到你意識到你的數據庫里壓根沒有叫做 ??users??? 的表。也許你的是 ??customer_data??? 或 ??crm_clients??。
這就是問題所在:當缺少上下文時,LLM 容易“幻覺”。 它會猜表名、漏掉 JOIN 關系,或用錯列名——因為默認情況下,它并不知道你的數據庫 schema。
RAG 如何解決這個問題
RAG 通過以“真實、可檢索的知識”為依據,讓模型“落地”。
在一個 Text-to-SQL 的 RAG 回路中,會發生以下事情:
- 檢索 Schema 上下文
系統在你的 schema 索引(向量數據庫或內存索引)中搜索表名、列描述、關系等信息。 - 生成 SQL
LLM 同時接收你的自然語言問題和檢索到的 schema,生成有效的 SQL 查詢。 - 執行并返回結果
查詢被校驗后,在真實數據庫中執行,并把結果返回給用戶。
這能確保每個 SQL 查詢都基于“了解 schema 的推理”,而不是模型的“拍腦袋”。
RAG 回路的實戰流程
一個包含以下循環的簡單流程圖:
[User Question]
↓
[Retrieve Schema Info]
↓
[Generate SQL (LLM)]
↓
[Validate + Execute]
↓
[Return Results]
?每一輪都會確保模型在生成查詢前,擁有“最新且準確”的 schema 知識——減少幻覺并提升可靠性。
系統架構
理解了為什么 Text-to-SQL 依賴 RAG 方法后,我們來看看系統的內部結構。
為了更具體,假設你在構建一個聊天機器人,能回答關于公司客戶數據庫的自然語言問題——從“上個月誰加入了?”到“本季度我們的平均訂單金額是多少?”都不在話下。
下面是系統的大致結構圖 ??
1. SQLite 數據庫——結構化數據源
每個 Text-to-SQL 系統都始于一個數據源。 為簡化,我們使用 SQLite——一種輕量的、基于文件的數據庫,適合原型開發與測試。
這里存放著你的真實數據:比如 ??customers???、??orders???、??products?? 等表。
當用戶提問時,我們的目標是把問題轉換成能在這個數據庫上執行的有效 SQL 查詢。
2. Embedding 層——把 schema 轉成向量
在模型生成 SQL 之前,它需要“理解”數據庫結構——表名、列名,以及它們的含義。
我們通過 Embeddings(向量嵌入)來實現:它是對文本的數字化表示,能夠捕捉語義。 借助 Hugging Face Embeddings(如 ??all-MiniLM-L6-v2??),我們把 schema 元數據轉為向量:
import os
import sqlite3
import hashlib
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
# ? Create / Load Chroma vector store
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory=CHROMA_DIR,
embedding_function=embeddings
)
def row_hash(values):
"""Generate unique hash for a row."""
return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()
def row_to_text(table, cols, row):
"""Convert SQLite row into a readable text chunk."""
return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])
def index_table(conn, table):
"""Index a single table into the vector store."""
cur = conn.cursor()
cur.execute(f"PRAGMA table_info({table});")
cols = [c[1] for c in cur.fetchall()]
cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
rows = cur.fetchall()
docs, ids, metas = [], [], []
for r in rows:
txt = row_to_text(table, cols, r)
pk = str(r[0])
hid = row_hash(r)
ids.append(f"{table}:{pk}")
docs.append(txt)
metas.append({"table": table, "pk": pk, "hash": hid})
# Add to Chroma vector store
vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
def main():
"""Main indexing pipeline."""
conn = sqlite3.connect(SQLITE_PATH)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cur.fetchall()]
for t in tqdm(tables, desc="Indexing tables"):
index_table(conn, t)
conn.close()
print("Indexing complete and persisted in Chroma.")
if __name__ == "__main__":
main()現在,每張表和每個字段都在高維空間中擁有一個向量表示——為后續的智能檢索打下基礎。
3. Vector Store(Chroma)——定位相關 schema
接著,我們把這些 embeddings 存進向量數據庫——這里使用 Chroma。
當用戶提問時,我們會:
- 使用同一個 embedding 模型對問題進行向量化。
- 在 Chroma 中檢索與之“最相近”的 schema 元素。
例如,當用戶問“最近有哪些用戶注冊?”時,檢索器可能會找到類似 ??customers.signup_date??? 和 ??customers.name?? 的 schema 項。
這樣能確保模型只看到相關的 schema 上下文——讓 SQL 生成“腳踏實地”。
4. LangChain 組件——系統“大腦”
LangChain 把所有環節串起來,提供檢索、推理與 SQL 生成的模塊化組件:
- Retrievers:從 Chroma 拉取相關的 schema 片段。
- Chains:定義步驟序列——檢索 → 生成 → 校驗 → 執行。
- LLMs:基于用戶問題和檢索到的 schema 生成 SQL。
示例代碼:
from typing import TypedDict, List
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from dotenv import load_dotenv
import os
load_dotenv()
# state type
class RAGState(TypedDict, total=False):
question: str
retrieved_docs: List[str]
generated_sql: str
validated_sql: str
sql_result: List[dict]
messages: List[dict]
# init vectorstore & models
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
TOP_K = int(os.getenv("TOP_K", "8"))
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings, collection_name="sqlite_docs")
retriever = vectordb.as_retriever(search_kwargs={"k": TOP_K})
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
sql_prompt = PromptTemplate.from_template("""
You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
Context:
{context}
Question:
{question}
Return only the SQL SELECT statement.
""")
async def retriever_node(state: RAGState) -> RAGState:
docs = await retriever.ainvoke(state["question"])
state["retrieved_docs"] = [d.page_content for d in docs]
return state
import re
from typing import Any
async def sql_generator_node(state: RAGState) -> RAGState:
"""
Generate SQL from the retrieved documents and user question.
Cleans LLM output, removes markdown/code fences, and ensures only SELECT statements remain.
"""
# 1. Combine retrieved documents
context = "\n\n".join(state.get("retrieved_docs", []))
# 2. Format the prompt
prompt_text = sql_prompt.format(context=context, question=state["question"])
# 3. Call the LLM asynchronously
out = await llm.ainvoke(prompt_text)
# 4. Extract text content if output is an AIMessage or ChatResult
if hasattr(out, "content"):
out = out.content
out = str(out).strip()
# 5. Remove code fences ``` or ```sql and any leading/trailing whitespace
out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()
# 6. Ensure the SQL starts with SELECT (case-insensitive)
match = re.search(r"(select\b.*)", out, flags=re.IGNORECASE | re.DOTALL)
if match:
out = match.group(1).strip()
else:
# fallback if no SELECT found
out = ""
# 7. Optional: remove trailing semicolon if present
out = out.rstrip(";").strip()
# 8. Save cleaned SQL back to state
state["generated_sql"] = out
return stateLangChain 作為“編排器”,確保每個問題都能順暢地走完完整的 RAG 回路。
5. FastAPI 后端——封裝管道
為了讓系統可交互,我們用 FastAPI 封裝整個流程。
FastAPI 能以簡潔且高性能的方式,把 RAG 管道暴露為 API。
當用戶用 POST 請求提交問題時,API 會:
- 檢索 schema 信息。
- 生成并校驗 SQL。
- 執行查詢。
- 返回格式化結果。
這一層是你的 Text-to-SQL 聊天機器人的“引擎室”。
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)
app = FastAPI(title="RAG Text->SQL API")
class QueryRequest(BaseModel):
question: str
show_sql: bool = True
@app.post("/query")
async def query(req: QueryRequest):
state = {"question": req.question, "messages": []}
# 1. retrieve
state = await retriever_node(state)
# 2. generate SQL
state = await sql_generator_node(state)
sql = state["generated_sql"]
ok, reason = validate_sql(sql, ALLOWED_TABLES)
if not ok:
raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
# 3. execute
cols, rows = execute_sql(sql)
# format result rows
result = [dict(zip(cols, r)) for r in rows]
return {"sql": sql if req.show_sql else None, "cols": cols, "rows": result}6. Streamlit 前端——聊天界面
最后,我們用 Streamlit 構建一個前端界面——一個輕量的 Web 界面供用戶交互。
用戶可以輸入問題、選擇是否查看生成的 SQL,并實時查看結果。
Streamlit 簡單易用,非常適合原型驗證 AI 工具: 幾行 Python 代碼,你就能得到一個能“對話數據”的交互式儀表盤。

全流程整合
端到端架構圖(示意):
User (Streamlit UI)
↓
FastAPI Backend
↓
LangChain RAG Pipeline
├── Retriever (Chroma Vector DB)
├── Embedding Layer (Hugging Face)
├── LLM (SQL Generator)
↓
SQLite Database (Execute SQL)
↓
Results → Back to UI這個流程涵蓋了一條查詢的整個生命周期——從純英文到執行 SQL,再到可讀的結果。
文末有代碼倉庫
環境準備
- 安裝依賴:
pip install -r requirements.txtSQLite 數據庫準備
- 說明 schema(例如 ?
?customers???、??orders??)。
os.makedirs("sample_db", exist_ok=True)
DB = "sample_db/sample.db"
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TEXT
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER,
total_amount REAL,
status TEXT,
created_at TEXT,
notes TEXT,
FOREIGN KEY(customer_id) REFERENCES customers(id)
);
""")
# seed data
customers = [
(1, "Alice Johnson", "alice@example.com", "2024-12-01"),
(2, "Bob Lee", "bob@example.com", "2024-12-05"),
(3, "Carol Singh", "carol@example.com", "2024-12-10"),
(4, "David Kim", "david.kim@example.com", "2024-12-12"),
.......
]
orders = [
(1, 1, 120.50, "completed", "2025-01-03", "First order"),
(2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
(3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
..........
]
cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
conn.commit()
conn.close()運行以下命令創建并插入 SQLite 數據:
python sample_db/create_sample_db.pySchema 的 Embedding 與索引
在 AI 能生成準確 SQL 之前,它需要“理解”數據庫結構——有哪些表、每個表包含哪些列,以及數據大概長什么樣。
這就是 Embeddings(向量嵌入)與語義索引發揮作用的地方。
什么是 Embeddings?
Embedding 是文本的數字表示(向量),位于高維空間,能捕捉語義。 例如:
“customer name” 和 “client full name” 這兩句話的向量會在該空間中彼此靠近,因為它們表達的含義大致相同。
通過為每個“表名”“列名”甚至“樣本數據”建立向量表示,模型就能在用戶提問時檢索到相關上下文——例如,“顯示最近注冊”會在語義上匹配 ??signup_date???、??created_at??? 或 ??join_date?? 這樣的列。
我們將用到的工具
- Hugging Face Embeddings:把文本轉為向量。
- Chroma Vector Store:高效存儲與檢索這些向量。
- SQLite:作為被索引的真實數據庫。
以下是索引腳本代碼 ??
# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
# ? Create / Load Chroma vector store
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory=CHROMA_DIR,
embedding_functinotallow=embeddings
)
def row_hash(values):
"""Generate unique hash for a row."""
return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()
def row_to_text(table, cols, row):
"""Convert SQLite row into a readable text chunk."""
return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])
def index_table(conn, table):
"""Index a single table into the vector store."""
cur = conn.cursor()
cur.execute(f"PRAGMA table_info({table});")
cols = [c[1] for c in cur.fetchall()]
cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
rows = cur.fetchall()
docs, ids, metas = [], [], []
for r in rows:
txt = row_to_text(table, cols, r)
pk = str(r[0])
hid = row_hash(r)
ids.append(f"{table}:{pk}")
docs.append(txt)
metas.append({"table": table, "pk": pk, "hash": hid})
# Add to Chroma vector store
vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
def main():
"""Main indexing pipeline."""
conn = sqlite3.connect(SQLITE_PATH)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cur.fetchall()]
for t in tqdm(tables, desc="Indexing tables"):
index_table(conn, t)
conn.close()
print("Indexing complete and persisted in Chroma.")生成 embeddings 并保存到向量數據庫:
python ingestion/index_sqlite.py步驟說明
- 提取 schema 和數據
對每張表抓取列名和若干行樣本數據。每一行都會被組織成描述該表內容的小“文檔”。 - 文本轉向量
使用 ???HuggingFaceEmbeddings???,將每個文本片段(如 ??"Table: customers\nname: John\nsignup_date: 2025-09-10"??)轉換為數值向量。 - 存入 Chroma
把這些 embeddings 存入 Chroma 向量存儲,同時保存元數據(如表名、主鍵、hash ID)。 - 啟用語義搜索
當用戶提問時,我們會把問題嵌入到同一向量空間,用 Chroma 檢索“最相關”的 schema 或數據片段。
檢索給 LLM 精準的上下文,幫助其寫出準確、可靠的 SQL。
為什么索引很重要
沒有 embeddings,模型就是“摸黑前行”——它不知道有哪些列、各自數據類型是什么。 通過對 schema 做 embedding 與索引,你為模型建立了一套“語義記憶”,它能在查詢時動態參考,從而始終生成有效且具備上下文意識的 SQL。
SQLite Database
↓
Extract Tables + Columns
↓
Convert to Embeddings (Hugging Face)
↓
Store in Chroma Vector DB
↓
Semantic Search During Query TimeRAG 查詢流程
我們已經構建了 schema 索引。現在開始讓它發揮作用。 RAG 查詢流程就是“魔法”發生的地方——系統接收用戶問題、檢索相關 schema 上下文、生成 SQL、在 SQLite 上執行,并返回結果。
逐步拆解如下:
第 1 步:檢索相關 schema
當用戶提問——比如:
“顯示上個月加入的所有客戶”
系統會把這個問題嵌入到與你的數據庫 schema 相同的向量空間(感謝之前創建的 embeddings)。利用 Chroma 的 retriever,它會找到最相關的 schema 片段(例如與 ??customers???、??signup_date?? 相關的表與列)。
async def retriever_node(state: RAGState) -> RAGState:
docs = await retriever.ainvoke(state["question"])
state["retrieved_docs"] = [d.page_content for d in docs]
return state由此,模型具備了對真實 schema 的“認知”——一張數據庫“備忘單”。
第 2 步:通過 LLM 生成 SQL
接下來,我們把問題和檢索到的 schema 一并交給 LLM(如 OpenAI、Mistral 或 Gemini)。模型使用專門的 prompt,只生成一個有效的只讀 ??SELECT?? 查詢。
sql_prompt = PromptTemplate.from_template("""
You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
Context:
{context}
Question:
{question}
Return only the SQL SELECT statement.
""")LLM 的輸出會被清洗并校驗,確保它只輸出 SQL 語句本身:
async def sql_generator_node(state: RAGState) -> RAGState:
context = "\n\n".join(state.get("retrieved_docs", []))
prompt_text = sql_prompt.format(cnotallow=context, questinotallow=state["question"])
out = await llm.ainvoke(prompt_text)
out = str(getattr(out, "content", out)).strip()
out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.I).replace("```", "").strip()
match = re.search(r"(select\b.*)", out, flags=re.I | re.DOTALL)
if match:
out = match.group(1).rstrip(";").strip()
state["generated_sql"] = out
return state到此,你的聊天機器人已經能實現“英文進、SQL 出”。
第 3 步:校驗 SQL(安全第一)
在執行前我們會驗證查詢,保護數據庫并確保正確性。
我們使用 ??sqlglot?? 來:
- 拒絕任何非 ?
?SELECT??? 語句(禁止 ??DELETE???、??UPDATE?? 等)。 - 檢查只引用允許的表。
- 安全解析語法。
def validate_sql(sql: str, allowed_tables: Set[str]) -> Tuple[bool, str]:
if ";" in sql:
return False, "semicolon or multiple statements not allowed"
for kw in DISALLOWED:
if f" {kw} " in f" {sql.lower()} ":
return False, f"disallowed keyword: {kw}"
try:
parsed = sqlglot.parse_one(sql, read="sqlite")
except Exception as e:
return False, f"sql parse error: {e}"
if parsed.key.lower() not in ALLOWED_STATEMENTS:
return False, "only SELECT statements allowed"
tables = extract_tables(sql)
if not tables.issubset(allowed_tables):
return False, f"disallowed tables used: {tables - allowed_tables}"
return True, "ok"這一層讓系統保持“只讀且安全”。
第 4 步:在 SQLite 上執行
通過校驗后,SQL 會在 SQLite 數據庫中執行。我們還加了行數限制等保護,避免重負載查詢。
def execute_sql(sql: str, row_limit: int = 1000, timeout=5.0) -> Tuple[List[str], List[Tuple[Any]]]:
sql = enforce_limit(sql, row_limit)
conn = open_ro_conn()
conn.execute(f"PRAGMA busy_timeout = {int(timeout*1000)};")
cur = conn.cursor()
cur.execute(sql)
cols = [c[0] for c in cur.description] if cur.description else []
rows = cur.fetchmany(row_limit)
conn.close()
return cols, rows然后把結果封裝成 JSON 返回給用戶。
FastAPI 后端
當 RAG 管道構建并通過測試后,下一步就是把它“暴露成 API”,讓用戶和前端能實時發送問題、獲取 SQL 結果并交互。
FastAPI 正適合此類需求——它快速、類型安全、原生支持異步,非常適合服務 AI 工作流。
為什么選 FastAPI?
FastAPI 提供:
- 速度:異步 I/O + 自動文檔(Swagger / ReDoc)
- 易集成:便于與 LangChain、Chroma 或本地模型對接
- 校驗:Pydantic 確保入參規范
- 可擴展:可部署到 Docker、Serverless 或本地私有化
簡言之:它是對你的 RAG Text-to-SQL 邏輯的理想封裝器。
API 設計概述
我們暴露一個端點:
POST /query請求體:
{
"question": "Show me all customers who joined last month",
"show_sql": true
}響應示例:
{
"sql": "SELECT * FROM customers WHERE join_date >= '2025-09-01'",
"cols": ["id", "name", "join_date"],
"rows": [
{"id": 1, "name": "Alice", "join_date": "2025-09-05"},
{"id": 2, "name": "Bob", "join_date": "2025-09-12"}
]
}完整代碼:
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
# import the core RAG components
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db
# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)
app = FastAPI(title="RAG Text->SQL API")
class QueryRequest(BaseModel):
question: str
show_sql: bool = True
@app.post("/query")
async def query(req: QueryRequest):
# Initialize conversation state
state = {"question": req.question, "messages": []}
# 1?? Retrieve relevant schema info
state = await retriever_node(state)
# 2?? Generate SQL query using LLM
state = await sql_generator_node(state)
sql = state["generated_sql"]
# 3?? Validate SQL
ok, reason = validate_sql(sql, ALLOWED_TABLES)
if not ok:
raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
# 4?? Execute SQL safely
cols, rows = execute_sql(sql)
result = [dict(zip(cols, r)) for r in rows]
# 5?? Return JSON response
return {
"sql": sql if req.show_sql else None,
"cols": cols,
"rows": result
}流程分解
- 接收問題
用戶通過 POST 發送自然語言問題。 - 檢索 schema 上下文
???retriever_node?? 從 Chroma 拉取最相關的表與列。 - 生成 SQL
???sql_generator_node??? 使用檢索上下文和問題,生成 ??SELECT?? 語句。 - 安全校驗
???validate_sql()?? 檢查查詢格式,并確保只訪問允許的表。 - 執行并返回
???execute_sql()?? 在 SQLite(只讀模式)上執行,并返回格式化的結果。
為什么這種設計有效
- 模塊化:檢索、生成、校驗、執行都可替換或擴展。
- 安全:只允許 ?
?SELECT??,并對表做白名單控制。 - 異步:整條管道可并發運行,易于隨用戶負載擴展。
- 易部署:Docker 打包,上云(AWS/GCP)或 Hugging Face Spaces 都很快。
[User / Streamlit UI]
↓ (POST /query)
FastAPI Backend
├── retriever_node() → Chroma Vector DB
├── sql_generator_node() → LLM (OpenAI / Gemini / Mistral)
├── validate_sql() → SQLGlot Parser
└── execute_sql() → SQLite Database
↓
JSON Response這樣的模塊化架構讓你的后端健壯、透明、且具備生產就緒度。
Streamlit 前端
FastAPI 后端跑起來后,我們再把它做成“可交互”的應用。 目標:構建一個簡單的網頁界面,讓任何人都能輸入自然語言問題、查看生成的 SQL,并實時看到查詢結果——幾秒鐘之內完成閉環。
完整代碼:
import streamlit as st
import requests
API_URL = "http://localhost:8000/query"
st.set_page_config(page_title="RAG Text→SQL Demo", layout="centered")
st.title("RAG Text → SQL (SQLite replica)")
with st.form("query_form"):
question = st.text_input(
"Ask a natural language question (about the DB)",
value="Show total orders per customer"
)
show_sql = st.checkbox("Show generated SQL", value=True)
submitted = st.form_submit_button("Submit")
if submitted:
# Ensure types are correct
question = str(question).strip()
show_sql = bool(show_sql)
if not question:
st.warning("Please enter a question.")
else:
payload = {"question": question, "show_sql": show_sql}
with st.spinner("Querying..."):
try:
resp = requests.post(API_URL, jsnotallow=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
if show_sql:
st.subheader("?? Generated SQL")
st.code(data.get("sql", ""), language="sql")
st.subheader("?? Query Results")
rows = data.get("rows", [])
if rows:
st.dataframe(rows)
else:
st.info("No rows returned for this query.")
except requests.exceptions.HTTPError as http_err:
st.error(f"HTTP error occurred: {http_err} - {resp.text}")
except requests.exceptions.ConnectionError:
st.error("Could not connect to the API. Make sure FastAPI is running on localhost:8000.")
except requests.exceptions.Timeout:
st.error("Request timed out. Try again later.")
except Exception as e:
st.error(f"Unexpected error: {e}")工作原理
- 用戶輸入
用戶在 Streamlit 文本框中輸入問題。 - 表單提交
Streamlit 把問題(以及 ???show_sql??? 標志)作為 JSON 發送到 FastAPI 的 ??/query?? 端點。 - 后端處理
FastAPI 運行完整的 RAG 管道:
- 檢索相關 schema
- 用 LLM 生成 SQL
- 在 SQLite 上校驗并執行
- 結果展示
Streamlit 接收 JSON 響應,并:
- 顯示生成的 SQL(當 ?
?show_sql=True??) - 用交互式表格展示結果行
+----------------------------------------------------------+
| ?? RAG Text → SQL (SQLite Replica) |
| ------------------------------------------------------ |
| Ask a natural language question: |
| [ Show total orders per customer ] [Submit] |
| |
| ?? Generated SQL |
| SELECT customer_id, COUNT(*) AS total_orders |
| FROM orders GROUP BY customer_id |
| |
| ?? Query Results |
| ┌──────────────┬───────────────┐ |
| │ customer_id │ total_orders │ |
| ├──────────────┼───────────────┤ |
| │ 1 │ 12 │ |
| │ 2 │ 8 │ |
| └──────────────┴───────────────┘ |
+----------------------------------------------------------+這給用戶帶來一個“即時反饋”閉環——從自然語言到 SQL,再到數據可視化。
運行應用:
uvicorn main:app --reload
streamlit run app.py演示:
問題:Show total orders per customer

問題:Total revenue from completed orders

下一步:增量 Embeddings 與可擴展更新
此時你已經構建了一個完整的 RAG 驅動的 Text-to-SQL 管道——從 schema 的 embedding 和檢索,到 SQL 生成、校驗和執行。 但在真實世界里,數據庫會持續演變。表會變化、新記錄會插入、列定義也會隨時間調整。
那么,如何保持 embedding 索引(從而保持 RAG 系統)是最新的呢?
下面是把項目推進到生產環境的一些方向:
1. 處理動態數據庫
在原型中,我們在啟動時只索引了一次數據庫。 這對靜態數據集很好用,但生產系統需要在以下情況進行“增量 embedding 更新”:
- 表結構變化(新增/刪除列)。
- 新增行并對語義有實質性影響。
- schema 文檔或元數據發生變化。
與其全量重建索引,不如“只嵌入變化的部分”。
增量索引策略
- 用時間戳或行 hash 跟蹤更新。
- 與 Chroma 中已存的元數據(如 hash ID)比較。
- 僅對修改的行或新增的表做重新 embedding。
- 使用 Chroma 的增量 ?
?add_texts()?? API 持久化更新。
這種方式最小化成本、時間與冗余,面對千萬級數據時尤為關鍵。
2. 用后臺任務自動化
重新索引或更新 embeddings 不應阻塞用戶查詢。 你可以把這些操作交給后臺 worker:
- Celery(配合 Redis/RabbitMQ):分布式任務調度。
- FastAPI BackgroundTasks:輕量異步更新。
使用 FastAPI 內置后臺任務的示例:
from fastapi import BackgroundTasks
@app.post("/update_embeddings")
async def update_embeddings(background_tasks: BackgroundTasks):
background_tasks.add_task(reindex_changed_tables)
return {"status": "update scheduled"}這樣,你的主 API(??/query??)可以保持響應靈敏,而索引在后臺更新。
3. 未來增強方向
- 基于公司查詢日志微調 SQL 生成器。
- 為常見問題添加緩存。
- 在 Streamlit 中集成可視化圖表,動態展示結果。
- 面向大規模數據集,切換到更大的向量數據庫(如 Pinecone 或 Qdrant)。
- 添加反饋回路:允許用戶糾正 SQL,并據此改進模型。
這些改進將把你的系統推向“能自我學習”的數據助理,更懂你的演進中的 schema 與業務邏輯。
關鍵收獲
你已經構建了一個強大的系統——一個基于 RAG 的完整 Text-to-SQL 方案,把純英文轉化為可操作的數據庫洞察。回顧你完成的內容:
1. 掌握了用于 Text-to-SQL 的 RAG
你學會了如何用 RAG 彌合自然語言與結構化數據庫 schema 之間的差距,從而顯著提升準確性并降低 SQL 生成的幻覺。
- 嵌入數據庫 schema 與樣本行,提供上下文。
- 在生成查詢前先檢索最相關片段。
- 把檢索與 LLM 推理結合,獲得可靠的 SQL 輸出。
2. 構建了模塊化、可擴展的架構
你使用 LangChain 與 FastAPI,把系統搭成了一個具備生產級結構的 AI 服務:
- SQLite 作為結構化數據源。
- Hugging Face Embeddings + Chroma 做語義檢索。
- LangChain 編排 retriever、prompt 與模型。
- FastAPI 提供輕量、異步的后端。
- Streamlit 提供簡潔、友好的前端界面。
各層都模塊化——可替換模型或數據庫,而不破壞其他環節。
3. 部署了交互式聊天界面
你用 Streamlit 把所有能力封裝起來,讓用戶可以:
- 直接輸入自然語言問題。
- 查看生成的 SQL。
- 即時在動態表格中查看結果。
你把原本屬于“開發者專屬”的工作(寫 SQL),變成了“與數據的自然對話”。
最后的想法
這個項目體現了我們與數據交互方式的一次強大轉變:AI 正在讓數據庫訪問民主化,讓任何人都能在不懂 SQL 的情況下提出復雜問題。
通過把檢索增強生成與現代 embeddings 和對話界面結合,我們正在打造把數據直接交到人們手中的工具——讓洞察更快、更容易、更觸手可及。
歡迎交流你的反饋、想法,以及你構建 Text-to-SQL 系統的經驗。
在 GitHub 查看完整代碼并開始動手嘗試:???https://github.com/dharampatel/ConvertQueryToSQL/tree/master??
本文轉載自??AI大模型觀察站??,作者:AI研究生

















