精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人

發布于 2025-11-5 07:37
瀏覽
0收藏

在這個項目中,我構建了一個由 AI 驅動的聊天機器人,它可以將自然語言問題轉換為 SQL 查詢,并直接從真實的 SQLite 數據庫中檢索答案。借助 LangChain、Hugging Face Embeddings 和 Chroma 向量存儲,這個應用展示了如何通過 Retrieval-Augmented Generation(RAG,檢索增強生成)工作流,把非結構化的用戶輸入與結構化數據連接起來——配有 FastAPI 后端與 Streamlit 前端界面。

引言:為什么是 Text-to-SQL?

想象一下: 你在會議上,經理突然問道:

“我們能看到上個月加入的所有客戶嗎?”

你看了一眼 SQL 編輯器……發現你得現寫查詢、確認表名,甚至可能還要調試一個缺失的 JOIN。與此同時,另一個人直接問了 AI 聊天機器人同樣的問題——立刻拿到整齊格式化的結果。

這就是 Text-to-SQL 的魔力——把自然語言變成數據庫查詢。

問題所在

SQL(Structured Query Language)是數據分析與數據工程的基石。它強大、靈活、精確——但對很多人來說并不“友好”。 大多數業務用戶、分析師,甚至一些開發者都會覺得 SQL 語法令人生畏,或者在快速獲取洞察時效率不高。

在多數團隊里,這會造成一個鴻溝:

  • 數據是可用的,但不容易“開口就問”。
  • 洞察是存在的,但被 SQL 專業技能“上鎖”。

解決方案:Text-to-SQL

Text-to-SQL 用來彌合這個鴻溝。 它讓任何人——無論技術背景如何——都能直接發問:

“上個季度我們最暢銷的 5 款產品是什么?”

并直接從數據庫拿到答案,AI 會在背后完成所有翻譯工作。

在幕后,一個 Text-to-SQL 系統通常要做四件關鍵的事:

  • 檢索相關的 schema 與上下文(知道有哪些表)。
  • 從自然語言問題生成有效的 SQL 查詢。
  • 安全地校驗和執行 SQL。
  • 以易讀的格式返回結果。

理解 RAG 方法

我們已經看到,Text-to-SQL 讓用戶能用自然語言提問并獲取數據庫結果——但 AI 到底是如何“知道”你的表、列和關系的呢?

答案是:RAG(Retrieval-Augmented Generation,檢索增強生成)。

什么是 RAG?

本質上,RAG 是一種結合“檢索(Retrieval)”與“生成(Generation)”的混合式 AI 方法:

  • 檢索——系統抓取相關信息(這里指數據庫 schema、表名、表間關系等)。
  • 生成——LLM 根據檢索到的上下文生成準確且有根據的輸出(即 SQL 查詢)。

你可以把 RAG 理解為:在 LLM 開始寫 SQL 之前,先給它一張“備考小抄”——它需要看到的確切 schema。

為什么不直接用普通的 LLM?

如果你直接問一個大語言模型(LLM),比如 GPT 或 Claude:

“列出上個月加入的所有客戶。”

它可能會生成這樣的 SQL:

SELECT * FROM users WHERE signup_date >= '2025-09-01';

看起來沒問題——直到你意識到你的數據庫里壓根沒有叫做 ??users??? 的表。也許你的是 ??customer_data??? 或 ??crm_clients??。

這就是問題所在:當缺少上下文時,LLM 容易“幻覺”。 它會猜表名、漏掉 JOIN 關系,或用錯列名——因為默認情況下,它并不知道你的數據庫 schema。

RAG 如何解決這個問題

RAG 通過以“真實、可檢索的知識”為依據,讓模型“落地”。

在一個 Text-to-SQL 的 RAG 回路中,會發生以下事情:

  • 檢索 Schema 上下文
    系統在你的 schema 索引(向量數據庫或內存索引)中搜索表名、列描述、關系等信息。
  • 生成 SQL
    LLM 同時接收你的自然語言問題和檢索到的 schema,生成有效的 SQL 查詢。
  • 執行并返回結果
    查詢被校驗后,在真實數據庫中執行,并把結果返回給用戶。

這能確保每個 SQL 查詢都基于“了解 schema 的推理”,而不是模型的“拍腦袋”。

RAG 回路的實戰流程

一個包含以下循環的簡單流程圖:

[User Question]
      ↓
[Retrieve Schema Info]
      ↓
[Generate SQL (LLM)]
      ↓
[Validate + Execute]
      ↓
[Return Results]
      ?

每一輪都會確保模型在生成查詢前,擁有“最新且準確”的 schema 知識——減少幻覺并提升可靠性。

系統架構

理解了為什么 Text-to-SQL 依賴 RAG 方法后,我們來看看系統的內部結構。

為了更具體,假設你在構建一個聊天機器人,能回答關于公司客戶數據庫的自然語言問題——從“上個月誰加入了?”到“本季度我們的平均訂單金額是多少?”都不在話下。

下面是系統的大致結構圖 ??

1. SQLite 數據庫——結構化數據源

每個 Text-to-SQL 系統都始于一個數據源。 為簡化,我們使用 SQLite——一種輕量的、基于文件的數據庫,適合原型開發與測試。

這里存放著你的真實數據:比如 ??customers???、??orders???、??products?? 等表。

當用戶提問時,我們的目標是把問題轉換成能在這個數據庫上執行的有效 SQL 查詢。

2. Embedding 層——把 schema 轉成向量

在模型生成 SQL 之前,它需要“理解”數據庫結構——表名、列名,以及它們的含義。

我們通過 Embeddings(向量嵌入)來實現:它是對文本的數字化表示,能夠捕捉語義。 借助 Hugging Face Embeddings(如 ??all-MiniLM-L6-v2??),我們把 schema 元數據轉為向量:

import os
import sqlite3
import hashlib
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")

# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# ? Create / Load Chroma vector store
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_function=embeddings
)

def row_hash(values):
    """Generate unique hash for a row."""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """Convert SQLite row into a readable text chunk."""
    return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """Index a single table into the vector store."""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # Add to Chroma vector store
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """Main indexing pipeline."""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("Indexing complete and persisted in Chroma.")

if __name__ == "__main__":
    main()

現在,每張表和每個字段都在高維空間中擁有一個向量表示——為后續的智能檢索打下基礎。

3. Vector Store(Chroma)——定位相關 schema

接著,我們把這些 embeddings 存進向量數據庫——這里使用 Chroma。

當用戶提問時,我們會:

  1. 使用同一個 embedding 模型對問題進行向量化。
  2. 在 Chroma 中檢索與之“最相近”的 schema 元素。

例如,當用戶問“最近有哪些用戶注冊?”時,檢索器可能會找到類似 ??customers.signup_date??? 和 ??customers.name?? 的 schema 項。

這樣能確保模型只看到相關的 schema 上下文——讓 SQL 生成“腳踏實地”。

4. LangChain 組件——系統“大腦”

LangChain 把所有環節串起來,提供檢索、推理與 SQL 生成的模塊化組件:

  • Retrievers:從 Chroma 拉取相關的 schema 片段。
  • Chains:定義步驟序列——檢索 → 生成 → 校驗 → 執行。
  • LLMs:基于用戶問題和檢索到的 schema 生成 SQL。

示例代碼:

from typing import TypedDict, List
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from dotenv import load_dotenv
import os

load_dotenv()

# state type
class RAGState(TypedDict, total=False):
    question: str
    retrieved_docs: List[str]
    generated_sql: str
    validated_sql: str
    sql_result: List[dict]
    messages: List[dict]


# init vectorstore & models
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
TOP_K = int(os.getenv("TOP_K", "8"))

embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings, collection_name="sqlite_docs")
retriever = vectordb.as_retriever(search_kwargs={"k": TOP_K})

llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)

sql_prompt = PromptTemplate.from_template("""
        You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
        Context:
        {context}
        
        Question:
        {question}
        
        Return only the SQL SELECT statement.
        """)


async def retriever_node(state: RAGState) -> RAGState:
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state



import re
from typing import Any

async def sql_generator_node(state: RAGState) -> RAGState:
    """
    Generate SQL from the retrieved documents and user question.
    Cleans LLM output, removes markdown/code fences, and ensures only SELECT statements remain.
    """
    # 1. Combine retrieved documents
    context = "\n\n".join(state.get("retrieved_docs", []))

    # 2. Format the prompt
    prompt_text = sql_prompt.format(context=context, question=state["question"])

    # 3. Call the LLM asynchronously
    out = await llm.ainvoke(prompt_text)

    # 4. Extract text content if output is an AIMessage or ChatResult
    if hasattr(out, "content"):
        out = out.content
    out = str(out).strip()

    # 5. Remove code fences ``` or ```sql and any leading/trailing whitespace
    out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()

    # 6. Ensure the SQL starts with SELECT (case-insensitive)
    match = re.search(r"(select\b.*)", out, flags=re.IGNORECASE | re.DOTALL)
    if match:
        out = match.group(1).strip()
    else:
        # fallback if no SELECT found
        out = ""

    # 7. Optional: remove trailing semicolon if present
    out = out.rstrip(";").strip()

    # 8. Save cleaned SQL back to state
    state["generated_sql"] = out
    return state

LangChain 作為“編排器”,確保每個問題都能順暢地走完完整的 RAG 回路。

5. FastAPI 后端——封裝管道

為了讓系統可交互,我們用 FastAPI 封裝整個流程。

FastAPI 能以簡潔且高性能的方式,把 RAG 管道暴露為 API。

當用戶用 POST 請求提交問題時,API 會:

  • 檢索 schema 信息。
  • 生成并校驗 SQL。
  • 執行查詢。
  • 返回格式化結果。

這一層是你的 Text-to-SQL 聊天機器人的“引擎室”。

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    state = {"question": req.question, "messages": []}
    # 1. retrieve
    state = await retriever_node(state)
    # 2. generate SQL
    state = await sql_generator_node(state)
    sql = state["generated_sql"]
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
    # 3. execute
    cols, rows = execute_sql(sql)
    # format result rows
    result = [dict(zip(cols, r)) for r in rows]
    return {"sql": sql if req.show_sql else None, "cols": cols, "rows": result}

6. Streamlit 前端——聊天界面

最后,我們用 Streamlit 構建一個前端界面——一個輕量的 Web 界面供用戶交互。

用戶可以輸入問題、選擇是否查看生成的 SQL,并實時查看結果。

Streamlit 簡單易用,非常適合原型驗證 AI 工具: 幾行 Python 代碼,你就能得到一個能“對話數據”的交互式儀表盤。

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

全流程整合

端到端架構圖(示意):

User (Streamlit UI)
        ↓
FastAPI Backend
        ↓
LangChain RAG Pipeline
   ├── Retriever (Chroma Vector DB)
   ├── Embedding Layer (Hugging Face)
   ├── LLM (SQL Generator)
        ↓
SQLite Database (Execute SQL)
        ↓
Results → Back to UI

這個流程涵蓋了一條查詢的整個生命周期——從純英文到執行 SQL,再到可讀的結果。

文末有代碼倉庫


環境準備

  • 安裝依賴:

pip install -r requirements.txt

SQLite 數據庫準備

  • 說明 schema(例如 ??customers???、??orders??)。

os.makedirs("sample_db", exist_ok=True)
DB = "sample_db/sample.db"
conn = sqlite3.connect(DB)
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
  id INTEGER PRIMARY KEY,
  name TEXT,
  email TEXT,
  created_at TEXT
);
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
  id INTEGER PRIMARY KEY,
  customer_id INTEGER,
  total_amount REAL,
  status TEXT,
  created_at TEXT,
  notes TEXT,
  FOREIGN KEY(customer_id) REFERENCES customers(id)
);
""")
# seed data
customers = [
    (1, "Alice Johnson", "alice@example.com", "2024-12-01"),
    (2, "Bob Lee", "bob@example.com", "2024-12-05"),
    (3, "Carol Singh", "carol@example.com", "2024-12-10"),
    (4, "David Kim", "david.kim@example.com", "2024-12-12"),
.......
]

orders = [
    (1, 1, 120.50, "completed", "2025-01-03", "First order"),
    (2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
    (3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
..........
]

cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
conn.commit()
conn.close()

運行以下命令創建并插入 SQLite 數據:

python sample_db/create_sample_db.py

Schema 的 Embedding 與索引

在 AI 能生成準確 SQL 之前,它需要“理解”數據庫結構——有哪些表、每個表包含哪些列,以及數據大概長什么樣。

這就是 Embeddings(向量嵌入)與語義索引發揮作用的地方。

什么是 Embeddings?

Embedding 是文本的數字表示(向量),位于高維空間,能捕捉語義。 例如:

“customer name” 和 “client full name” 這兩句話的向量會在該空間中彼此靠近,因為它們表達的含義大致相同。

通過為每個“表名”“列名”甚至“樣本數據”建立向量表示,模型就能在用戶提問時檢索到相關上下文——例如,“顯示最近注冊”會在語義上匹配 ??signup_date???、??created_at??? 或 ??join_date?? 這樣的列。

我們將用到的工具

  • Hugging Face Embeddings:把文本轉為向量。
  • Chroma Vector Store:高效存儲與檢索這些向量。
  • SQLite:作為被索引的真實數據庫。

以下是索引腳本代碼 ??

# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# ? Create / Load Chroma vector store
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_functinotallow=embeddings
)

def row_hash(values):
    """Generate unique hash for a row."""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """Convert SQLite row into a readable text chunk."""
    return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """Index a single table into the vector store."""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # Add to Chroma vector store
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """Main indexing pipeline."""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("Indexing complete and persisted in Chroma.")

生成 embeddings 并保存到向量數據庫:

python ingestion/index_sqlite.py

步驟說明

  1. 提取 schema 和數據
    對每張表抓取列名和若干行樣本數據。每一行都會被組織成描述該表內容的小“文檔”。
  2. 文本轉向量
    使用 ???HuggingFaceEmbeddings???,將每個文本片段(如 ??"Table: customers\nname: John\nsignup_date: 2025-09-10"??)轉換為數值向量。
  3. 存入 Chroma
    把這些 embeddings 存入 Chroma 向量存儲,同時保存元數據(如表名、主鍵、hash ID)。
  4. 啟用語義搜索
    當用戶提問時,我們會把問題嵌入到同一向量空間,用 Chroma 檢索“最相關”的 schema 或數據片段。

檢索給 LLM 精準的上下文,幫助其寫出準確、可靠的 SQL。

為什么索引很重要

沒有 embeddings,模型就是“摸黑前行”——它不知道有哪些列、各自數據類型是什么。 通過對 schema 做 embedding 與索引,你為模型建立了一套“語義記憶”,它能在查詢時動態參考,從而始終生成有效且具備上下文意識的 SQL。

SQLite Database
   ↓
Extract Tables + Columns
   ↓
Convert to Embeddings (Hugging Face)
   ↓
Store in Chroma Vector DB
   ↓
Semantic Search During Query Time

RAG 查詢流程

我們已經構建了 schema 索引。現在開始讓它發揮作用。 RAG 查詢流程就是“魔法”發生的地方——系統接收用戶問題、檢索相關 schema 上下文、生成 SQL、在 SQLite 上執行,并返回結果。

逐步拆解如下:

第 1 步:檢索相關 schema

當用戶提問——比如:

“顯示上個月加入的所有客戶”

系統會把這個問題嵌入到與你的數據庫 schema 相同的向量空間(感謝之前創建的 embeddings)。利用 Chroma 的 retriever,它會找到最相關的 schema 片段(例如與 ??customers???、??signup_date?? 相關的表與列)。

async def retriever_node(state: RAGState) -> RAGState:
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state

由此,模型具備了對真實 schema 的“認知”——一張數據庫“備忘單”。

第 2 步:通過 LLM 生成 SQL

接下來,我們把問題和檢索到的 schema 一并交給 LLM(如 OpenAI、Mistral 或 Gemini)。模型使用專門的 prompt,只生成一個有效的只讀 ??SELECT?? 查詢。

sql_prompt = PromptTemplate.from_template("""
You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).

Context:
{context}

Question:
{question}

Return only the SQL SELECT statement.
""")

LLM 的輸出會被清洗并校驗,確保它只輸出 SQL 語句本身:

async def sql_generator_node(state: RAGState) -> RAGState:
    context = "\n\n".join(state.get("retrieved_docs", []))
    prompt_text = sql_prompt.format(cnotallow=context, questinotallow=state["question"])
    out = await llm.ainvoke(prompt_text)

    out = str(getattr(out, "content", out)).strip()
    out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.I).replace("```", "").strip()

    match = re.search(r"(select\b.*)", out, flags=re.I | re.DOTALL)
    if match:
        out = match.group(1).rstrip(";").strip()

    state["generated_sql"] = out
    return state

到此,你的聊天機器人已經能實現“英文進、SQL 出”。

第 3 步:校驗 SQL(安全第一)

在執行前我們會驗證查詢,保護數據庫并確保正確性。

我們使用 ??sqlglot?? 來:

  • 拒絕任何非 ??SELECT??? 語句(禁止 ??DELETE???、??UPDATE?? 等)。
  • 檢查只引用允許的表。
  • 安全解析語法。

def validate_sql(sql: str, allowed_tables: Set[str]) -> Tuple[bool, str]:
    if ";" in sql:
        return False, "semicolon or multiple statements not allowed"
    for kw in DISALLOWED:
        if f" {kw} " in f" {sql.lower()} ":
            return False, f"disallowed keyword: {kw}"

    try:
        parsed = sqlglot.parse_one(sql, read="sqlite")
    except Exception as e:
        return False, f"sql parse error: {e}"

    if parsed.key.lower() not in ALLOWED_STATEMENTS:
        return False, "only SELECT statements allowed"

    tables = extract_tables(sql)
    if not tables.issubset(allowed_tables):
        return False, f"disallowed tables used: {tables - allowed_tables}"
    return True, "ok"

這一層讓系統保持“只讀且安全”。

第 4 步:在 SQLite 上執行

通過校驗后,SQL 會在 SQLite 數據庫中執行。我們還加了行數限制等保護,避免重負載查詢。

def execute_sql(sql: str, row_limit: int = 1000, timeout=5.0) -> Tuple[List[str], List[Tuple[Any]]]:
    sql = enforce_limit(sql, row_limit)
    conn = open_ro_conn()
    conn.execute(f"PRAGMA busy_timeout = {int(timeout*1000)};")
    cur = conn.cursor()
    cur.execute(sql)
    cols = [c[0] for c in cur.description] if cur.description else []
    rows = cur.fetchmany(row_limit)
    conn.close()
    return cols, rows

然后把結果封裝成 JSON 返回給用戶。

FastAPI 后端

當 RAG 管道構建并通過測試后,下一步就是把它“暴露成 API”,讓用戶和前端能實時發送問題、獲取 SQL 結果并交互。

FastAPI 正適合此類需求——它快速、類型安全、原生支持異步,非常適合服務 AI 工作流。

為什么選 FastAPI?

FastAPI 提供:

  • 速度:異步 I/O + 自動文檔(Swagger / ReDoc)
  • 易集成:便于與 LangChain、Chroma 或本地模型對接
  • 校驗:Pydantic 確保入參規范
  • 可擴展:可部署到 Docker、Serverless 或本地私有化

簡言之:它是對你的 RAG Text-to-SQL 邏輯的理想封裝器。

API 設計概述

我們暴露一個端點:

POST /query

請求體:

{
  "question": "Show me all customers who joined last month",
  "show_sql": true
}

響應示例:

{
  "sql": "SELECT * FROM customers WHERE join_date >= '2025-09-01'",
  "cols": ["id", "name", "join_date"],
  "rows": [
    {"id": 1, "name": "Alice", "join_date": "2025-09-05"},
    {"id": 2, "name": "Bob", "join_date": "2025-09-12"}
  ]
}

完整代碼:

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv

# import the core RAG components
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    # Initialize conversation state
    state = {"question": req.question, "messages": []}

    # 1?? Retrieve relevant schema info
    state = await retriever_node(state)

    # 2?? Generate SQL query using LLM
    state = await sql_generator_node(state)
    sql = state["generated_sql"]

    # 3?? Validate SQL
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")

    # 4?? Execute SQL safely
    cols, rows = execute_sql(sql)
    result = [dict(zip(cols, r)) for r in rows]

    # 5?? Return JSON response
    return {
        "sql": sql if req.show_sql else None,
        "cols": cols,
        "rows": result
    }

流程分解

  1. 接收問題
    用戶通過 POST 發送自然語言問題。
  2. 檢索 schema 上下文
    ???retriever_node?? 從 Chroma 拉取最相關的表與列。
  3. 生成 SQL
    ???sql_generator_node??? 使用檢索上下文和問題,生成 ??SELECT?? 語句。
  4. 安全校驗
    ???validate_sql()?? 檢查查詢格式,并確保只訪問允許的表。
  5. 執行并返回
    ???execute_sql()?? 在 SQLite(只讀模式)上執行,并返回格式化的結果。

為什么這種設計有效

  • 模塊化:檢索、生成、校驗、執行都可替換或擴展。
  • 安全:只允許 ??SELECT??,并對表做白名單控制。
  • 異步:整條管道可并發運行,易于隨用戶負載擴展。
  • 易部署:Docker 打包,上云(AWS/GCP)或 Hugging Face Spaces 都很快。

[User / Streamlit UI]
        ↓ (POST /query)
    FastAPI Backend
  ├── retriever_node()  →  Chroma Vector DB
  ├── sql_generator_node()  →  LLM (OpenAI / Gemini / Mistral)
  ├── validate_sql()  →  SQLGlot Parser
  └── execute_sql()  →  SQLite Database
        ↓
     JSON Response

這樣的模塊化架構讓你的后端健壯、透明、且具備生產就緒度。

Streamlit 前端

FastAPI 后端跑起來后,我們再把它做成“可交互”的應用。 目標:構建一個簡單的網頁界面,讓任何人都能輸入自然語言問題、查看生成的 SQL,并實時看到查詢結果——幾秒鐘之內完成閉環。

完整代碼:

import streamlit as st
import requests

API_URL = "http://localhost:8000/query"

st.set_page_config(page_title="RAG Text→SQL Demo", layout="centered")
st.title("RAG Text → SQL (SQLite replica)")

with st.form("query_form"):
    question = st.text_input(
        "Ask a natural language question (about the DB)",
        value="Show total orders per customer"
    )
    show_sql = st.checkbox("Show generated SQL", value=True)
    submitted = st.form_submit_button("Submit")

if submitted:
    # Ensure types are correct
    question = str(question).strip()
    show_sql = bool(show_sql)

    if not question:
        st.warning("Please enter a question.")
    else:
        payload = {"question": question, "show_sql": show_sql}

        with st.spinner("Querying..."):
            try:
                resp = requests.post(API_URL, jsnotallow=payload, timeout=60)
                resp.raise_for_status()
                data = resp.json()

                if show_sql:
                    st.subheader("?? Generated SQL")
                    st.code(data.get("sql", ""), language="sql")

                st.subheader("??  Query Results")
                rows = data.get("rows", [])
                if rows:
                    st.dataframe(rows)
                else:
                    st.info("No rows returned for this query.")

            except requests.exceptions.HTTPError as http_err:
                st.error(f"HTTP error occurred: {http_err} - {resp.text}")
            except requests.exceptions.ConnectionError:
                st.error("Could not connect to the API. Make sure FastAPI is running on localhost:8000.")
            except requests.exceptions.Timeout:
                st.error("Request timed out. Try again later.")
            except Exception as e:
                st.error(f"Unexpected error: {e}")

工作原理

  1. 用戶輸入
    用戶在 Streamlit 文本框中輸入問題。
  2. 表單提交
    Streamlit 把問題(以及 ???show_sql??? 標志)作為 JSON 發送到 FastAPI 的 ??/query?? 端點。
  3. 后端處理
    FastAPI 運行完整的 RAG 管道:
  • 檢索相關 schema
  • 用 LLM 生成 SQL
  • 在 SQLite 上校驗并執行
  1. 結果展示
    Streamlit 接收 JSON 響應,并:
  • 顯示生成的 SQL(當 ??show_sql=True??)
  • 用交互式表格展示結果行

+----------------------------------------------------------+
|  ?? RAG Text → SQL (SQLite Replica)                      |
|  ------------------------------------------------------  |
|  Ask a natural language question:                        |
|  [ Show total orders per customer             ] [Submit] |
|                                                          |
|  ?? Generated SQL                                         |
|  SELECT customer_id, COUNT(*) AS total_orders             |
|  FROM orders GROUP BY customer_id                         |
|                                                           |
|  ??  Query Results                                         |
|  ┌──────────────┬───────────────┐                         |
|  │ customer_id  │ total_orders  │                         |
|  ├──────────────┼───────────────┤                         |
|  │ 1            │ 12            │                         |
|  │ 2            │ 8             │                         |
|  └──────────────┴───────────────┘                         |
+----------------------------------------------------------+

這給用戶帶來一個“即時反饋”閉環——從自然語言到 SQL,再到數據可視化。

運行應用:

uvicorn main:app --reload
streamlit run app.py

演示:

問題:Show total orders per customer

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

問題:Total revenue from completed orders

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

下一步:增量 Embeddings 與可擴展更新

此時你已經構建了一個完整的 RAG 驅動的 Text-to-SQL 管道——從 schema 的 embedding 和檢索,到 SQL 生成、校驗和執行。 但在真實世界里,數據庫會持續演變。表會變化、新記錄會插入、列定義也會隨時間調整。

那么,如何保持 embedding 索引(從而保持 RAG 系統)是最新的呢?

下面是把項目推進到生產環境的一些方向:

1. 處理動態數據庫

在原型中,我們在啟動時只索引了一次數據庫。 這對靜態數據集很好用,但生產系統需要在以下情況進行“增量 embedding 更新”:

  • 表結構變化(新增/刪除列)。
  • 新增行并對語義有實質性影響。
  • schema 文檔或元數據發生變化。

與其全量重建索引,不如“只嵌入變化的部分”。

增量索引策略

  • 用時間戳或行 hash 跟蹤更新。
  • 與 Chroma 中已存的元數據(如 hash ID)比較。
  • 僅對修改的行或新增的表做重新 embedding。
  • 使用 Chroma 的增量 ??add_texts()?? API 持久化更新。

這種方式最小化成本、時間與冗余,面對千萬級數據時尤為關鍵。

2. 用后臺任務自動化

重新索引或更新 embeddings 不應阻塞用戶查詢。 你可以把這些操作交給后臺 worker:

  • Celery(配合 Redis/RabbitMQ):分布式任務調度。
  • FastAPI BackgroundTasks:輕量異步更新。

使用 FastAPI 內置后臺任務的示例:

from fastapi import BackgroundTasks

@app.post("/update_embeddings")
async def update_embeddings(background_tasks: BackgroundTasks):
    background_tasks.add_task(reindex_changed_tables)
    return {"status": "update scheduled"}

這樣,你的主 API(??/query??)可以保持響應靈敏,而索引在后臺更新。

3. 未來增強方向

  • 基于公司查詢日志微調 SQL 生成器。
  • 為常見問題添加緩存。
  • 在 Streamlit 中集成可視化圖表,動態展示結果。
  • 面向大規模數據集,切換到更大的向量數據庫(如 Pinecone 或 Qdrant)。
  • 添加反饋回路:允許用戶糾正 SQL,并據此改進模型。

這些改進將把你的系統推向“能自我學習”的數據助理,更懂你的演進中的 schema 與業務邏輯。

關鍵收獲

你已經構建了一個強大的系統——一個基于 RAG 的完整 Text-to-SQL 方案,把純英文轉化為可操作的數據庫洞察。回顧你完成的內容:

1. 掌握了用于 Text-to-SQL 的 RAG

你學會了如何用 RAG 彌合自然語言與結構化數據庫 schema 之間的差距,從而顯著提升準確性并降低 SQL 生成的幻覺。

  • 嵌入數據庫 schema 與樣本行,提供上下文。
  • 在生成查詢前先檢索最相關片段。
  • 把檢索與 LLM 推理結合,獲得可靠的 SQL 輸出。

2. 構建了模塊化、可擴展的架構

你使用 LangChain 與 FastAPI,把系統搭成了一個具備生產級結構的 AI 服務:

  • SQLite 作為結構化數據源。
  • Hugging Face Embeddings + Chroma 做語義檢索。
  • LangChain 編排 retriever、prompt 與模型。
  • FastAPI 提供輕量、異步的后端。
  • Streamlit 提供簡潔、友好的前端界面。

各層都模塊化——可替換模型或數據庫,而不破壞其他環節。

3. 部署了交互式聊天界面

你用 Streamlit 把所有能力封裝起來,讓用戶可以:

  • 直接輸入自然語言問題。
  • 查看生成的 SQL。
  • 即時在動態表格中查看結果。

你把原本屬于“開發者專屬”的工作(寫 SQL),變成了“與數據的自然對話”。

最后的想法

這個項目體現了我們與數據交互方式的一次強大轉變:AI 正在讓數據庫訪問民主化,讓任何人都能在不懂 SQL 的情況下提出復雜問題。

通過把檢索增強生成與現代 embeddings 和對話界面結合,我們正在打造把數據直接交到人們手中的工具——讓洞察更快、更容易、更觸手可及。

歡迎交流你的反饋、想法,以及你構建 Text-to-SQL 系統的經驗。

在 GitHub 查看完整代碼并開始動手嘗試:???https://github.com/dharampatel/ConvertQueryToSQL/tree/master??

本文轉載自??AI大模型觀察站??,作者:AI研究生

已于2025-11-5 07:37:22修改
收藏
回復
舉報
回復
相關推薦
成人综合在线观看| 国产日韩1区| 欧美mv日韩mv亚洲| 玩弄中年熟妇正在播放| 国产三级在线| 国产精品911| 欧美最顶级丰满的aⅴ艳星| 蜜桃久久精品成人无码av| 国产精品亚洲一区二区在线观看| 亚洲综合无码一区二区| 日本在线成人一区二区| www.桃色av嫩草.com| 久久久久久婷| 欧美福利在线观看| 亚洲图片第一页| 国产一区二区三区不卡av| 欧美性色黄大片| 久久精品国产sm调教网站演员| 国产高清一区在线观看| 成人免费毛片a| 国产在线久久久| 国产又爽又黄的视频| 91精品蜜臀一区二区三区在线| 精品亚洲一区二区三区| 香蕉视频色在线观看| 色婷婷综合久久久中字幕精品久久 | 午夜免费日韩视频| 成人在线观看小视频| 欧美精品momsxxx| 337p日本欧洲亚洲大胆色噜噜| 亚洲精品第三页| 成人看片网站| 欧美性xxxx极品高清hd直播| 波多野结衣 作品| 天堂а√在线资源在线| 久久精品人人做人人综合| 国产精品久久久久久免费观看| 中文字幕在线网站| 日韩高清一区二区| 国产不卡av在线免费观看| 国产成人亚洲精品自产在线| 国产精品www.| 精品少妇一区二区30p| 殴美一级黄色片| 成人久久一区| 一区二区日韩精品| 精品无码国产污污污免费网站| 牛牛视频精品一区二区不卡| 日韩精品一区二区三区中文精品| 69久久精品无码一区二区| 国产精品视频首页| 日韩亚洲欧美成人一区| 国产又粗又猛大又黄又爽| 欧美三级电影网址| 欧美日韩国产美女| www.久久91| 色综合视频一区二区三区日韩| 欧美日韩在线不卡| 日本黄色福利视频| 91嫩草国产线观看亚洲一区二区| 欧美精品国产精品| 亚洲一级片免费观看| 欧美2区3区4区| 欧美大胆一级视频| 国产精品成人99一区无码| 久久大胆人体视频| 国产视频自拍一区| 蜜乳av中文字幕| 色喇叭免费久久综合| 久久精品国产一区二区电影| 老熟妇高潮一区二区三区| 中国成人一区| 久久99精品视频一区97| 日本视频www| 奶水喷射视频一区| 国产精品永久免费视频| 国产免费不卡av| 成人午夜又粗又硬又大| 开心色怡人综合网站| 国产高清美女一级毛片久久| 国产精品久久久久一区| www.一区二区.com| 亚洲妇女成熟| 欧美日韩高清一区二区不卡| 亚洲精品一二三四| 网友自拍区视频精品| 在线观看久久久久久| 久草综合在线视频| 亚洲免费综合| 91亚洲国产精品| 天天摸夜夜添狠狠添婷婷| 国产午夜精品福利| 91视频成人免费| 中文字幕在线直播| 91精品国产欧美一区二区18| 日本黄色动态图| 欧美oldwomenvideos| 欧美精品成人91久久久久久久| 日韩欧美成人一区二区三区 | 欧美日韩国产中文字幕| 男人天堂成人在线| 午夜日韩影院| 中文字幕一区电影| 国产成人自拍视频在线| 精品一区二区三区的国产在线播放| 国产精品日韩一区二区免费视频| 国产天堂素人系列在线视频| 亚洲一级片在线观看| 美女一区二区三区视频| 国产精品jk白丝蜜臀av小说| 中文字幕亚洲字幕| 国产精品人人人人| 国产成人一级电影| 亚洲国产一区二区三区在线播| 9999精品成人免费毛片在线看| 欧美日韩色一区| 青青草视频播放| 欧美不卡在线| 国产欧美日韩专区发布| 毛片在线播放网址| 亚洲国产精品视频| www.五月天色| 欧美日韩伦理| 久久久免费高清电视剧观看| 97人人爽人人爽人人爽| 国产欧美在线观看一区| 3d动漫一区二区三区| 免费精品一区| 久久久www成人免费精品| 国产成人自拍偷拍| 国产亚洲综合性久久久影院| 欧美精品久久久久久久自慰| 精品一区二区三区中文字幕 | 久久久久久国产精品mv| 国产蜜臀av在线播放| 欧美精品第1页| 天堂网中文在线观看| 日本不卡免费在线视频| 欧美一级爱爱| 欧美第一视频| 亚洲欧美在线一区| 久久中文字幕免费| 久久综合一区二区| 韩国日本在线视频| 久久成人av| 国产97在线视频| 噜噜噜在线观看播放视频| 日韩欧美国产免费播放| 熟女高潮一区二区三区| 国产精品婷婷| 欧美在线一二三区| 91精品影视| 日韩中文字幕在线播放| 亚洲国产无线乱码在线观看| 国产精品无圣光一区二区| 天天爱天天操天天干| 波多野结衣一区| 成人黄色片在线| 91麻豆一二三四在线| 欧美变态凌虐bdsm| 中文字幕亚洲高清| 国产亚洲美州欧州综合国| 欧美精品成人网| 999国产精品视频| 99国产超薄丝袜足j在线观看| 欧美人与牲禽动交com| 精品福利视频一区二区三区| 日本网站免费观看| 国产婷婷精品av在线| 视色视频在线观看| 欧美99在线视频观看| 国产精品一区二区三区免费| 天堂中文av在线资源库| 在线视频日韩精品| 精品国产无码AV| 亚洲成人免费看| 免费看污片网站| 国内成人精品2018免费看| 日本中文字幕在线视频观看| 日韩av影院| 国产欧洲精品视频| 波多野结衣在线播放| 精品视频在线观看日韩| 91精品国自产| 亚洲国产一二三| 女人十八毛片嫩草av| 国产毛片精品视频| 人妻有码中文字幕| 亚洲91久久| 欧美日韩在线高清| 欧美黄色一级| 国产精品扒开腿做爽爽爽男男| 麻豆系列在线观看| 亚洲精品网址在线观看| 国产片高清在线观看| 欧美性猛交xxxx黑人| 四虎精品免费视频| 久久亚洲精精品中文字幕早川悠里 | 极品少妇一区二区三区精品视频| 欧美精品自拍视频| 国产国产精品| 欧美重口乱码一区二区| 亚洲精品观看| 成人春色激情网| 是的av在线| 欧美黑人巨大精品一区二区| 97最新国自产拍视频在线完整在线看| 欧美成人精品福利| 国产一区二区三区中文字幕 | 91欧美日韩麻豆精品| 天天影视色香欲综合网老头| 国产免费一区二区三区四区| 久久婷婷色综合| 亚洲成a人无码| 国模娜娜一区二区三区| 日韩一级理论片| 一本色道久久| 99在线观看视频免费| 午夜影院欧美| 视频一区国产精品| 久久99蜜桃| 久久99影院| 久久视频在线观看| 国产精品久久久久免费| 日本免费精品| 91丝袜美腿美女视频网站| 国产亚洲精彩久久| 国产精品久久久久77777| 国产美女高潮在线观看| 欧美国产极速在线| 高清免费电影在线观看| 日韩最新在线视频| 成人三级黄色免费网站| 亚洲午夜性刺激影院| 九色视频在线播放| 亚洲欧美在线免费观看| 男人av在线| 亚洲人免费视频| 国产三级在线看| 亚洲网站在线观看| www.av在线播放| 在线精品91av| 看黄网站在线| 久热精品视频在线| 超碰在线网址| 九色精品美女在线| 日韩激情av| 国内精品在线一区| 久久青草伊人| 日本老师69xxx| 色天使综合视频| 国产精品网站入口| 成人国产精品久久| 春色成人在线视频| 成人中文字幕视频| 久久精品99久久| 精品免费av| 制服诱惑一区| 亚洲欧美综合| 国产成人无码精品久久久性色| 国产女优一区| 黄色免费网址大全| 韩国精品免费视频| 丰满人妻一区二区三区免费视频棣| 成人在线视频一区| 黄瓜视频污在线观看| 国产欧美久久久精品影院| 免费高清在线观看电视| 亚洲国产日韩在线一区模特| 日韩欧美国产亚洲| 在线观看日韩电影| av中文字幕第一页| 日韩精品久久久久久福利| 国产系列在线观看| 欧美日韩电影在线观看| 69久成人做爰电影| 成人国产精品久久久久久亚洲| 中文字幕视频精品一区二区三区| 久久精品国产99精品国产亚洲性色| 国产一区二区三区四区大秀| 天天干天天操天天干天天操| 亚洲国产影院| 色戒在线免费观看| 不卡影院免费观看| 成人在线手机视频| 洋洋av久久久久久久一区| 亚洲天堂男人av| 91精品福利在线一区二区三区| 全国男人的天堂网| 日韩中文av在线| 女海盗2成人h版中文字幕| 国产精品视频在线观看| 红杏成人性视频免费看| 偷拍视频一区二区| 亚洲成人原创| 精品国产乱码久久久久久1区二区| 91在线视频网址| 少妇被躁爽到高潮无码文| 福利一区福利二区微拍刺激| 国产精品久久久久精| 精品视频在线播放免| 蜜臀av在线| 国产在线观看精品| 亚洲精品国模| 青青青在线观看视频| 日韩二区三区四区| 超碰男人的天堂| 一区二区三区四区乱视频| 国产第一页在线观看| 亚洲第一网中文字幕| 国产剧情在线| 国产精品激情av电影在线观看 | 亚洲精品福利视频| √天堂8在线网| 国产日韩一区在线| 经典一区二区| 日本a级片免费观看| 成人性生交大片免费看中文网站| 岛国片在线免费观看| 色94色欧美sute亚洲13| 天天干在线观看| 午夜精品一区二区三区视频免费看| 伊人久久一区| 亚洲欧美日韩国产yyy| 久久久久欧美精品| 蜜桃精品成人影片| 午夜精品成人在线| 亚洲男女视频在线观看| 欧美超级乱淫片喷水| 伊人久久一区| 一区二区三区一级片| 美国欧美日韩国产在线播放| 国产一区二区三区四区五区六区| 欧美日韩中文字幕日韩欧美| 日韩中文字幕综合| 久久久久国产精品免费网站| 日韩成人久久| 996这里只有精品| 国产精品1024久久| 国产精品成人免费观看| 欧美一区二区三区在线电影| 黄色网址在线免费播放| 91精品国产综合久久香蕉| 四虎国产精品免费观看| 三级一区二区三区| 综合电影一区二区三区| av av片在线看| 欧美精品一二区| 2021年精品国产福利在线| www.xxx麻豆| 91女神在线视频| 亚洲GV成人无码久久精品| 亚洲视频在线观看免费| 亚洲伦理影院| 亚洲一区二区三区免费看| 久久66热re国产| 欧美成人综合色| 日韩黄色av网站| 欧美韩国亚洲| 一区二区三区av在线| 国产一区在线精品| 九九热国产精品视频| 亚洲白虎美女被爆操| 中文字幕在线官网| 亚洲第一导航| 国产麻豆精品在线| 国产无码精品在线播放| 亚洲欧美日韩中文在线制服| 成人国产一区| 日本一本草久p| 97久久超碰精品国产| 成人小视频在线播放| 日韩在线国产精品| 成人爽a毛片| 88av.com| 夜夜夜精品看看| 国产大学生校花援交在线播放| 成人性教育视频在线观看| 激情综合亚洲| 亚洲ⅴ国产v天堂a无码二区| 欧美一级高清片| 久久毛片亚洲| a级片一区二区| 久久久精品tv| www.好吊色| 日本一区二区不卡| 在线成人直播| www亚洲色图| 精品国产乱码久久| 欧美性www| 欧美日韩亚洲一| 亚洲天堂成人网| 日韩porn| 97超级碰碰| 青青青爽久久午夜综合久久午夜| 久草视频免费播放| 中文字幕在线日韩| 秋霞影院一区二区三区| 日韩欧美理论片| 日韩欧美国产网站|