Python+FAISS:五分鐘打造一個RAG系統
我遇到個麻煩:手頭有幾十(好吧,實際上是幾百)個 PDF 文件——研究論文、API 文檔、白皮書——散落在各個文件夾里。搜索慢得要死,瀏覽更煩。所以我搞了個 PDF 問答引擎,能把文件吃進去、分塊、嵌入、用 FAISS 索引、找最佳段落,還能給個簡潔的回答(而且有不用 API 的備選方案)。這篇文章把所有東西都給你——端到端的代碼,用大白話解釋清楚。
你能得到啥
? 本地 PDF 加載(不用云)
? 更聰明的分塊(保留上下文)
? 用 sentence-transformers 做 embeddings
? 用 FAISS 做向量搜索(cosine)+ SQLite 存 metadata
? Retriever → Answerer,支持可選 LLM(有 extractive summary 備選)
? 用 Gradio 做個簡潔的單頁 app
關鍵詞(方便別人找到這篇):AI document search, PDF search, vector database, FAISS, embeddings, Sentence Transformers, RAG, Gradio, OpenAI(可選)。
項目結構(直接復制成文件)
pdfqa/
settings.py
loader.py
chunker.py
embedder.py
store.py
searcher.py
answerer.py
app.py
build_index.py
requirements.txt0) 環境要求
# requirements.txt
pdfplumber>=0.11.0
sentence-transformers>=3.0.1
faiss-cpu>=1.8.0
numpy>=1.26.4
scikit-learn>=1.5.1
tqdm>=4.66.4
gradio>=4.40.0
python-dotenv>=1.0.1
nltk>=3.9.1
rank-bm25>=0.2.2
openai>=1.40.0 # 可選;不用 API key 也能跑安裝并準備 NLTK(只需一次):
python -c "import nltk; nltk.download('punkt_tab')" || python -c "import nltk; nltk.download('punkt')"1) 設置
# settings.py
from pathlib import Path
from dataclasses import dataclass
@dataclass(frozen=True)
class Config:
PDF_DIR: Path = Path("./pdfs") # 放你的 PDF 文件
DB_PATH: Path = Path("./chunks.sqlite") # SQLite 存 chunk metadata
INDEX_PATH: Path = Path("./index.faiss") # FAISS 索引文件
MODEL_NAME: str = "sentence-transformers/all-MiniLM-L12-v2"
CHUNK_SIZE: int = 1000 # 每塊目標字符數
CHUNK_OVERLAP: int = 200
TOP_K: int = 4 # 檢索的段落數
MAX_ANSWER_TOKENS: int = 500 # 用于 LLM
CFG = Config()2) 加載 PDF(本地,超快)
# loader.py
import pdfplumber
from pathlib import Path
from typing importList, Dict
from settings import CFG
defload_pdfs(pdf_dir: Path = CFG.PDF_DIR) -> List[Dict]:
pdf_dir.mkdir(parents=True, exist_ok=True)
docs = []
for pdf_path insorted(pdf_dir.glob("*.pdf")):
text_parts = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
# 比簡單 get_text 更穩;可根據需要調整
text_parts.append(page.extract_text() or"")
text = "\n".join(text_parts).strip()
if text:
docs.append({"filename": pdf_path.name, "text": text})
print(f"? 加載 {pdf_path.name} ({len(text)} 字符)")
else:
print(f"?? 空的或無法提取:{pdf_path.name}")
return docs
if __name__ == "__main__":
load_pdfs()3) 分塊,保留上下文(段落感知)
# chunker.py
from typing importList, Dict
from settings import CFG
def_paragraphs(txt: str) -> List[str]:
# 按空行分割;保持結構輕量
blocks = [b.strip() for b in txt.split("\n\n") if b.strip()]
return blocks or [txt]
defchunk_document(doc: Dict, size: int = CFG.CHUNK_SIZE, overlap: int = CFG.CHUNK_OVERLAP) -> List[Dict]:
paras = _paragraphs(doc["text"])
chunks = []
buf, start_char = [], 0
cur_len = 0
for p in paras:
if cur_len + len(p) + 1 <= size:
buf.append(p)
cur_len += len(p) + 1
continue
# 清空緩沖
block = "\n\n".join(buf).strip()
if block:
chunks.append({
"filename": doc["filename"],
"start": start_char,
"end": start_char + len(block),
"text": block
})
# 從尾部創建重疊部分
tail = block[-overlap:] if overlap > 0andlen(block) > overlap else""
buf = [tail, p] if tail else [p]
start_char += max(0, len(block) - overlap)
cur_len = len("\n\n".join(buf))
# 最后一塊
block = "\n\n".join(buf).strip()
if block:
chunks.append({
"filename": doc["filename"],
"start": start_char,
"end": start_char + len(block),
"text": block
})
return chunks
defchunk_all(docs: List[Dict]) -> List[Dict]:
out = []
for d in docs:
out.extend(chunk_document(d))
print(f"?? 創建了 {len(out)} 個 chunk")
return out
if __name__ == "__main__":
from loader import load_pdfs
all_chunks = chunk_all(load_pdfs())4) 嵌入(支持 cosine)
# embedder.py
import numpy as np
from typing importList, Dict
from sentence_transformers import SentenceTransformer
from settings import CFG
from tqdm import tqdm
from sklearn.preprocessing import normalize
_model = None
defget_model() -> SentenceTransformer:
global _model
if _model isNone:
_model = SentenceTransformer(CFG.MODEL_NAME)
return _model
defembed_texts(chunks: List[Dict]) -> np.ndarray:
model = get_model()
texts = [c["text"] for c in chunks]
# encode → L2 歸一化,確保 Inner Product == Cosine similarity
vecs = model.encode(texts, show_progress_bar=True, convert_to_numpy=True)
return normalize(vecs) # 對 IndexFlatIP 很重要5) 存儲向量 + metadata(FAISS + SQLite)
# store.py
import sqlite3
import faiss
import numpy as np
from typing importList, Dict
from settings import CFG
definit_db():
con = sqlite3.connect(CFG.DB_PATH)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
filename TEXT,
start INTEGER,
end INTEGER,
text TEXT
)
""")
con.commit()
con.close()
defsave_chunks(chunks: List[Dict]):
con = sqlite3.connect(CFG.DB_PATH)
cur = con.cursor()
cur.execute("DELETE FROM chunks")
cur.executemany(
"INSERT INTO chunks (filename, start, end, text) VALUES (?,?,?,?)",
[(c["filename"], c["start"], c["end"], c["text"]) for c in chunks]
)
con.commit()
con.close()
defbuild_faiss_index(vecs: np.ndarray):
dim = vecs.shape[1]
index = faiss.IndexFlatIP(dim) # cosine(因為我們歸一化了向量)
index.add(vecs.astype(np.float32))
faiss.write_index(index, str(CFG.INDEX_PATH))
print(f"?? FAISS 索引保存到 {CFG.INDEX_PATH}")
defread_faiss_index() -> faiss.Index:
return faiss.read_index(str(CFG.INDEX_PATH))
defget_chunk_by_ids(ids: List[int]) -> List[Dict]:
con = sqlite3.connect(CFG.DB_PATH)
cur = con.cursor()
rows = []
for i in ids:
cur.execute("SELECT id, filename, start, end, text FROM chunks WHERE id=?", (i+1,))
r = cur.fetchone()
if r:
rows.append({
"id": r[0]-1, "filename": r[1], "start": r[2], "end": r[3], "text": r[4]
})
con.close()
return rows注意:SQLite 的 row ID 從 1 開始;FAISS 向量從 0 開始索引。我們按插入順序存儲 → 查詢時用 (faiss_id + 1)。
6) 搜索(嵌入查詢 → 找最近鄰)
# searcher.py
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import normalize
from settings import CFG
from store import read_faiss_index, get_chunk_by_ids
_qmodel = None
def_qembed(q: str) -> np.ndarray:
global _qmodel
if _qmodel isNone:
_qmodel = SentenceTransformer(CFG.MODEL_NAME)
qv = _qmodel.encode([q], convert_to_numpy=True)
return normalize(qv) # cosine,和 corpus 一致
defsearch(query: str, k: int = CFG.TOP_K):
index: faiss.Index = read_faiss_index()
qv = _qembed(query)
D, I = index.search(qv.astype(np.float32), k)
ids = I[0].tolist()
return get_chunk_by_ids(ids)7) 回答生成(可選 LLM + extractive 備選)
# answerer.py
import os, re
from typing importList, Dict
from rank_bm25 import BM25Okapi
SYSTEM_PROMPT = (
"你只能從提供的上下文回答。\n"
"引用段落用 [1], [2], ...,按片段順序。\n"
"如果信息不足,簡短說明。\n"
)
def_try_openai(question: str, snippets: List[str]) -> str:
try:
from openai import OpenAI
client = OpenAI() # 需要環境變量 OPENAI_API_KEY
ctx = "\n\n".join(f"[{i+1}] {s}"for i, s inenumerate(snippets))
prompt = f"{SYSTEM_PROMPT}\nContext:\n{ctx}\n\nQuestion: {question}\nAnswer:"
resp = client.chat.completions.create(
model=os.getenv("LLM_MODEL", "gpt-4o-mini"),
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=500
)
return resp.choices[0].message.content
except Exception:
return""
def_extractive_fallback(question: str, snippets: List[str]) -> str:
# 用 BM25 給片段中的句子評分,拼接成簡短總結
sents, source_ids = [], []
for i, s inenumerate(snippets):
for sent in re.split(r"(?<=[.!?])\s+", s):
if sent.strip():
sents.append(sent.strip())
source_ids.append(i)
tokenized = [st.lower().split() for st in sents]
bm25 = BM25Okapi(tokenized)
scores = bm25.get_scores(question.lower().split())
ranked = sorted(zip(sents, source_ids, scores), key=lambda x: x[2], reverse=True)[:6]
stitched = []
used_sources = set()
for sent, sid, _ in ranked:
stitched.append(sent + f" [{sid+1}]")
used_sources.add(sid+1)
return" ".join(stitched) or"我沒有足夠的信息來回答。"
defanswer(question: str, passages: List[Dict]) -> str:
snippets = [p["text"] for p in passages]
ans = _try_openai(question, snippets)
return ans if ans.strip() else _extractive_fallback(question, snippets)8) 構建腳本(一次性索引)
# build_index.py
from loader import load_pdfs
from chunker import chunk_all
from embedder import embed_texts
from store import init_db, save_chunks, build_faiss_index
if __name__ == "__main__":
docs = load_pdfs()
chunks = chunk_all(docs)
init_db()
save_chunks(chunks)
vecs = embed_texts(chunks)
build_faiss_index(vecs)
print("? 索引完成!可以開始提問了!")運行:
python build_index.py9) 簡潔的 Web 應用(Gradio)
# app.py
import gradio as gr
from searcher import search
from answerer import answer
defask(query: str):
ifnot query.strip():
return"輸入一個問題開始吧。", ""
results = search(query, k=4)
ctx = "\n\n---\n\n".join([r["text"] for r in results])
ans = answer(query, results)
cites = "\n".join(f"[{i+1}] {r['filename']} ({r['start']}–{r['end']})"for i, r inenumerate(results))
return ans, cites
with gr.Blocks(title="PDF Q&A") as demo:
gr.Markdown("## ?? PDF Q&A — 從你的文檔中問任何問題")
inp = gr.Textbox(label="你的問題", placeholder="例如:解釋 transformers 中的 attention")
btn = gr.Button("搜索并回答")
out = gr.Markdown(label="回答")
refs = gr.Markdown(label="引用")
btn.click(fn=ask, inputs=inp, outputs=[out, refs])
if __name__ == "__main__":
demo.launch()運行:
python app.py我學到的經驗(別重蹈我的覆轍)
?檢索質量 = 回答質量。精準的 top-k 比花哨的 prompt 更重要。
?分塊是個平衡游戲。段落感知的合并加上小范圍重疊,既好讀又保留上下文。
?Cosine + 歸一化很重要。對 embeddings 做 L2 歸一化,用 IndexFlatIP 確保 FAISS 里的 cosine 準確。
?可選 LLM,強制備選。別讓工具依賴 API key。extractive 方案 + BM25 句子排序效果意外不錯。
?Metadata 省時間。存好 (filename, start, end),就能立刻深鏈或顯示引用。
下一步升級
? 語義分塊(支持標題、目錄感知)
? Rerankers(Cohere Rerank 或 BGE cross-encoder)優化最終列表
? 對話記憶(支持后續問題)
? 用 vector DB 持久化(Weaviate, Qdrant 等)
? 服務端部署(Docker + 小型 FastAPI 包裝)
小 FAQ
可以完全離線跑嗎?可以。Embeddings + FAISS + extractive 備選都是本地的,LLM 是可選的。
能處理幾千個 chunk 嗎?可以。FAISS 在 CPU 上擴展很好。如果數據量超大,換成 IVF 或 HNSW 索引。
為啥用 Gradio,不用 Streamlit?Gradio 輕量、連接快。用你喜歡的工具——移植很簡單。
幾個有用的官方文檔鏈接
? Sentence Transformers: https://www.sbert.net/
? FAISS: https://github.com/facebookresearch/faiss
? Gradio: https://www.gradio.app/
本文轉載自??PyTorch研習社??,作者:AI研究生

















