MUVERA:讓RAG系統(tǒng)中的多向量檢索像單向量一樣高效
在向量數(shù)據(jù)庫(kù)和信息檢索領(lǐng)域,多向量嵌入模型(如 ColBERT、ColPali)憑借其強(qiáng)大的語(yǔ)義捕獲能力正在成為主流選擇。這類(lèi)模型能夠保留文本的詞元級(jí)別含義,或是識(shí)別圖像不同部分的信息特征。然而,它們也帶來(lái)了顯著的性能挑戰(zhàn):龐大的內(nèi)存占用和較慢的檢索速度。Weaviate 在 1.31 版本中引入的 MUVERA 編碼算法,正是為了解決這些問(wèn)題而生。
多向量模型的優(yōu)勢(shì)與困境
多向量嵌入的核心優(yōu)勢(shì)在于其細(xì)粒度的語(yǔ)義表達(dá)能力。相比單向量模型將整個(gè)文檔壓縮成一個(gè)固定長(zhǎng)度的向量,多向量模型為文檔的每個(gè)詞元或圖像塊生成獨(dú)立的向量表示。這種設(shè)計(jì)使得模型能夠捕捉更豐富的語(yǔ)義信息,在檢索任務(wù)中展現(xiàn)出更高的準(zhǔn)確性。
單向量與多向量對(duì)比
但這種精細(xì)化表示的代價(jià)同樣明顯。假設(shè)要索引一百萬(wàn)個(gè)文檔,每個(gè)文檔平均包含 100 個(gè)詞元。使用傳統(tǒng)的單向量模型(768 維,32 位浮點(diǎn)數(shù)),大約需要 3.1GB 內(nèi)存。而多向量模型(96 維)的內(nèi)存消耗可能高達(dá) 40GB,超過(guò)十倍的差距。這種內(nèi)存壓力在大規(guī)模部署場(chǎng)景下會(huì)轉(zhuǎn)化為實(shí)實(shí)在在的成本負(fù)擔(dān)。
多向量嵌入內(nèi)存對(duì)比
性能瓶頸不僅體現(xiàn)在存儲(chǔ)層面。在檢索階段,多向量模型需要使用 MaxSim 運(yùn)算符計(jì)算相似度。這個(gè)過(guò)程需要遍歷查詢的每個(gè)詞元,找出它與文檔所有詞元中的最佳匹配,然后累加所有匹配得分。數(shù)學(xué)表達(dá)式如下:
這種非線性計(jì)算相比簡(jiǎn)單的點(diǎn)積運(yùn)算復(fù)雜得多,直接影響了查詢響應(yīng)速度和數(shù)據(jù)導(dǎo)入效率。
單向量和多向量?jī)?nèi)存使用情況
MUVERA 的核心思想
MUVERA(Multi-Vector Retrieval via Fixed Dimensional Encodings)的設(shè)計(jì)哲學(xué)是將復(fù)雜的多向量檢索問(wèn)題轉(zhuǎn)化為單向量最大內(nèi)積搜索。算法的關(guān)鍵在于構(gòu)建固定維度編碼(FDE),將一組長(zhǎng)度不定的向量集合壓縮成單個(gè)固定長(zhǎng)度的向量表示。
整個(gè)轉(zhuǎn)換過(guò)程可以用一個(gè)簡(jiǎn)潔的映射函數(shù)表示:
這里的核心目標(biāo)是讓編碼后的單向量點(diǎn)積能夠很好地近似原始多向量的 MaxSim 相似度:
MUVERA 高層概覽
這種轉(zhuǎn)換帶來(lái)的效率提升是顯著的。對(duì)于包含 100 萬(wàn)個(gè)文檔、每個(gè)文檔 100 個(gè)向量的數(shù)據(jù)集,傳統(tǒng)方案需要索引 1 億個(gè)向量,而 MUVERA 只需處理 100 萬(wàn)個(gè) FDE 向量,將 HNSW 圖的規(guī)模縮減到原來(lái)的 1%。
算法實(shí)現(xiàn)細(xì)節(jié)
MUVERA 通過(guò)四個(gè)精心設(shè)計(jì)的步驟完成編碼轉(zhuǎn)換:空間劃分、降維、重復(fù)增強(qiáng)和最終投影。每個(gè)步驟都有明確的數(shù)學(xué)基礎(chǔ)和實(shí)際考量。
空間劃分策略
第一步是將高維向量空間劃分成若干個(gè)桶。算法采用 SimHash 技術(shù)實(shí)現(xiàn)這一過(guò)程,這是一種基于局部敏感哈希的方法。具體來(lái)說(shuō),算法會(huì)采樣 個(gè)高斯向量,然后通過(guò)計(jì)算輸入向量與這些高斯向量的點(diǎn)積符號(hào)來(lái)確定桶編號(hào):
這種劃分方式的優(yōu)勢(shì)在于其與數(shù)據(jù)分布無(wú)關(guān),不需要預(yù)先訓(xùn)練,也不會(huì)因?yàn)閿?shù)據(jù)漂移而失效。劃分完成后,屬于同一個(gè)桶的向量會(huì)被聚合成一個(gè)代表性向量。
MUVERA 步驟 1 - 空間劃分
MUVERA 步驟 2 - 填充空簇

降維與重復(fù)

MUVERA 步驟 3 - 降維

性能評(píng)測(cè)與實(shí)際效果
Weaviate 團(tuán)隊(duì)使用 LoTTE 基準(zhǔn)測(cè)試數(shù)據(jù)集進(jìn)行了詳細(xì)的性能評(píng)估。該數(shù)據(jù)集包含約 11.9 萬(wàn)個(gè)文檔,使用 ColBERT v2.0 編碼后生成了 1500 萬(wàn)個(gè) 128 維向量,總內(nèi)存占用約 8GB。

未使用 MUVERA + SQ 與 MUVERA + SQ 時(shí)的堆內(nèi)存分配
數(shù)據(jù)導(dǎo)入速度的改善同樣顯著。基準(zhǔn)場(chǎng)景下,導(dǎo)入 11 萬(wàn)個(gè)對(duì)象需要 20 多分鐘,相當(dāng)于每秒只能處理約 100 個(gè)對(duì)象。而使用 MUVERA 后,這個(gè)時(shí)間縮短到 3-6 分鐘。對(duì)于需要頻繁更新索引的生產(chǎn)環(huán)境,這種效率提升意義重大。
性能權(quán)衡考量
技術(shù)方案從來(lái)不是完美的,MUVERA 也有其代價(jià)。最主要的妥協(xié)體現(xiàn)在召回率上。測(cè)試數(shù)據(jù)顯示,在相同的搜索參數(shù)下,啟用 MUVERA 會(huì)導(dǎo)致召回率下降。不過(guò),這個(gè)問(wèn)題可以通過(guò)調(diào)整 HNSW 的 ef 參數(shù)來(lái)緩解。
當(dāng) ef 值設(shè)置在 512 以上時(shí),召回率可以恢復(fù)到 80% 以上;而在 2048 時(shí)甚至能超過(guò) 90%。但提高 ef 值意味著要檢索更多的候選集,這會(huì)降低查詢吞吐量。因此,實(shí)際應(yīng)用中需要在召回質(zhì)量和查詢速度之間找到平衡點(diǎn)。
MUVERA 對(duì)比
Google Research 團(tuán)隊(duì)的實(shí)驗(yàn)結(jié)果進(jìn)一步驗(yàn)證了 MUVERA 的效果。在 BEIR 基準(zhǔn)測(cè)試中,相比基于單向量啟發(fā)式的 PLAID 系統(tǒng),MUVERA 在召回率平均提升 10% 的同時(shí),將延遲降低了 90%。這種性能提升在大規(guī)模部署中的價(jià)值不言而喻。
適用場(chǎng)景分析
MUVERA 并非萬(wàn)能方案,它最適合以下幾類(lèi)應(yīng)用場(chǎng)景。首先是內(nèi)存成本敏感的大規(guī)模部署。當(dāng)數(shù)據(jù)集規(guī)模達(dá)到千萬(wàn)甚至億級(jí)時(shí),內(nèi)存占用的降低可以直接轉(zhuǎn)化為每年數(shù)萬(wàn)甚至數(shù)十萬(wàn)美元的成本節(jié)約。其次是對(duì)索引速度有較高要求的場(chǎng)景,比如需要頻繁更新的實(shí)時(shí)系統(tǒng)。
另一個(gè)重要考量是對(duì)召回質(zhì)量的容忍度。如果應(yīng)用場(chǎng)景對(duì)檢索精度有極致要求,那么需要仔細(xì)權(quán)衡 MUVERA 帶來(lái)的召回率下降是否可以接受。不過(guò)對(duì)于許多實(shí)際應(yīng)用來(lái)說(shuō),輕微的召回?fù)p失往往是可以承受的,特別是考慮到可以通過(guò)調(diào)整搜索參數(shù)來(lái)部分恢復(fù)性能。
從實(shí)現(xiàn)角度看,Weaviate 的集成使得啟用 MUVERA 變得非常簡(jiǎn)單,只需要幾行配置代碼。用戶可以設(shè)置的主要參數(shù)包括 k_sim(空間劃分的細(xì)粒度)、d_proj(降維后的維度)和 r_reps(重復(fù)次數(shù))。Weaviate 團(tuán)隊(duì)為這些參數(shù)提供了合理的默認(rèn)值,大多數(shù)場(chǎng)景下可以直接使用。
值得注意的是,MUVERA 的固定維度編碼還可以結(jié)合標(biāo)量量化(Scalar Quantization)等技術(shù)進(jìn)一步壓縮。Google 的研究表明,通過(guò)乘積量化可以在幾乎不影響檢索質(zhì)量的前提下,將內(nèi)存占用再減少 32 倍。這為超大規(guī)模應(yīng)用提供了更多優(yōu)化空間。
實(shí)現(xiàn)
圖片
https://github.com/sionic-ai/muvera-py/tree/master
我在github上面找到一個(gè)MUVERA的python實(shí)現(xiàn),大家可以嘗試一下。
import logging
import time
import numpy as np
from dataclasses import dataclass, replace
from enum import Enum
from typing import Optional, List
class EncodingType(Enum):
DEFAULT_SUM = 0
AVERAGE = 1
class ProjectionType(Enum):
DEFAULT_IDENTITY = 0
AMS_SKETCH = 1
@dataclass
class FixedDimensionalEncodingConfig:
dimension: int = 128
num_repetitions: int = 10
num_simhash_projections: int = 6
seed: int = 42
encoding_type: EncodingType = EncodingType.DEFAULT_SUM
projection_type: ProjectionType = ProjectionType.DEFAULT_IDENTITY
projection_dimension: Optional[int] = None
fill_empty_partitions: bool = False
final_projection_dimension: Optional[int] = None
def _append_to_gray_code(gray_code: int, bit: bool) -> int:
return (gray_code << 1) + (int(bit) ^ (gray_code & 1))
def _gray_code_to_binary(num: int) -> int:
mask = num >> 1
while mask != 0:
num = num ^ mask
mask >>= 1
return num
def _simhash_matrix_from_seed(
dimension: int, num_projections: int, seed: int
) -> np.ndarray:
rng = np.random.default_rng(seed)
return rng.normal(loc=0.0, scale=1.0, size=(dimension, num_projections)).astype(
np.float32
)
def _ams_projection_matrix_from_seed(
dimension: int, projection_dim: int, seed: int
) -> np.ndarray:
rng = np.random.default_rng(seed)
out = np.zeros((dimension, projection_dim), dtype=np.float32)
indices = rng.integers(0, projection_dim, size=dimension)
signs = rng.choice([-1.0, 1.0], size=dimension)
out[np.arange(dimension), indices] = signs
return out
def _apply_count_sketch_to_vector(
input_vector: np.ndarray, final_dimension: int, seed: int
) -> np.ndarray:
rng = np.random.default_rng(seed)
out = np.zeros(final_dimension, dtype=np.float32)
indices = rng.integers(0, final_dimension, size=input_vector.shape[0])
signs = rng.choice([-1.0, 1.0], size=input_vector.shape[0])
np.add.at(out, indices, signs * input_vector)
return out
def _simhash_partition_index_gray(sketch_vector: np.ndarray) -> int:
partition_index = 0
for val in sketch_vector:
partition_index = _append_to_gray_code(partition_index, val > 0)
return partition_index
def _distance_to_simhash_partition(
sketch_vector: np.ndarray, partition_index: int
) -> int:
num_projections = sketch_vector.size
binary_representation = _gray_code_to_binary(partition_index)
sketch_bits = (sketch_vector > 0).astype(int)
binary_array = (binary_representation >> np.arange(num_projections - 1, -1, -1)) & 1
return int(np.sum(sketch_bits != binary_array))
def _generate_fde_internal(
point_cloud: np.ndarray, config: FixedDimensionalEncodingConfig
) -> np.ndarray:
if point_cloud.ndim != 2 or point_cloud.shape[1] != config.dimension:
raise ValueError(
f"Input data shape {point_cloud.shape} is inconsistent with config dimension {config.dimension}."
)
if not (0 <= config.num_simhash_projections < 32):
raise ValueError(
f"num_simhash_projections must be in [0, 31]: {config.num_simhash_projections}"
)
num_points, original_dim = point_cloud.shape
num_partitions = 2**config.num_simhash_projections
use_identity_proj = config.projection_type == ProjectionType.DEFAULT_IDENTITY
projection_dim = original_dim if use_identity_proj else config.projection_dimension
if not use_identity_proj and (not projection_dim or projection_dim <= 0):
raise ValueError(
"A positive projection_dimension is required for non-identity projections."
)
final_fde_dim = config.num_repetitions * num_partitions * projection_dim
out_fde = np.zeros(final_fde_dim, dtype=np.float32)
for rep_num in range(config.num_repetitions):
current_seed = config.seed + rep_num
sketches = point_cloud @ _simhash_matrix_from_seed(
original_dim, config.num_simhash_projections, current_seed
)
if use_identity_proj:
projected_matrix = point_cloud
elif config.projection_type == ProjectionType.AMS_SKETCH:
ams_matrix = _ams_projection_matrix_from_seed(
original_dim, projection_dim, current_seed
)
projected_matrix = point_cloud @ ams_matrix
rep_fde_sum = np.zeros(num_partitions * projection_dim, dtype=np.float32)
partition_counts = np.zeros(num_partitions, dtype=np.int32)
partition_indices = np.array(
[_simhash_partition_index_gray(sketches[i]) for i in range(num_points)]
)
for i in range(num_points):
start_idx = partition_indices[i] * projection_dim
rep_fde_sum[start_idx : start_idx + projection_dim] += projected_matrix[i]
partition_counts[partition_indices[i]] += 1
if config.encoding_type == EncodingType.AVERAGE:
for i in range(num_partitions):
start_idx = i * projection_dim
if partition_counts[i] > 0:
rep_fde_sum[start_idx : start_idx + projection_dim] /= (
partition_counts[i]
)
elif config.fill_empty_partitions and num_points > 0:
distances = [
_distance_to_simhash_partition(sketches[j], i)
for j in range(num_points)
]
nearest_point_idx = np.argmin(distances)
rep_fde_sum[start_idx : start_idx + projection_dim] = (
projected_matrix[nearest_point_idx]
)
rep_start_index = rep_num * num_partitions * projection_dim
out_fde[rep_start_index : rep_start_index + rep_fde_sum.size] = rep_fde_sum
if config.final_projection_dimension and config.final_projection_dimension > 0:
return _apply_count_sketch_to_vector(
out_fde, config.final_projection_dimension, config.seed
)
return out_fde
def generate_query_fde(
point_cloud: np.ndarray, config: FixedDimensionalEncodingConfig
) -> np.ndarray:
"""Generates a Fixed Dimensional Encoding for a query point cloud (using SUM)."""
if config.fill_empty_partitions:
raise ValueError(
"Query FDE generation does not support 'fill_empty_partitions'."
)
query_config = replace(config, encoding_type=EncodingType.DEFAULT_SUM)
return _generate_fde_internal(point_cloud, query_config)
def generate_document_fde(
point_cloud: np.ndarray, config: FixedDimensionalEncodingConfig
) -> np.ndarray:
"""Generates a Fixed Dimensional Encoding for a document point cloud (using AVERAGE)."""
doc_config = replace(config, encoding_type=EncodingType.AVERAGE)
return _generate_fde_internal(point_cloud, doc_config)
def generate_fde(
point_cloud: np.ndarray, config: FixedDimensionalEncodingConfig
) -> np.ndarray:
if config.encoding_type == EncodingType.DEFAULT_SUM:
return generate_query_fde(point_cloud, config)
elif config.encoding_type == EncodingType.AVERAGE:
return generate_document_fde(point_cloud, config)
else:
raise ValueError(f"Unsupported encoding type in config: {config.encoding_type}")
def generate_document_fde_batch(
doc_embeddings_list: List[np.ndarray], config: FixedDimensionalEncodingConfig
) -> np.ndarray:
"""
Generates FDEs for a batch of documents using highly optimized NumPy vectorization.
Fully compliant with C++ implementation including all projection types.
"""
batch_start_time = time.perf_counter()
num_docs = len(doc_embeddings_list)
if num_docs == 0:
logging.warning("[FDE Batch] Empty document list provided")
return np.array([])
logging.info(f"[FDE Batch] Starting batch FDE generation for {num_docs} documents")
# Input validation
valid_docs = []
for i, doc in enumerate(doc_embeddings_list):
if doc.ndim != 2:
logging.warning(
f"[FDE Batch] Document {i} has invalid shape (ndim={doc.ndim}), skipping"
)
continue
if doc.shape[1] != config.dimension:
raise ValueError(
f"Document {i} has incorrect dimension: expected {config.dimension}, got {doc.shape[1]}"
)
if doc.shape[0] == 0:
logging.warning(f"[FDE Batch] Document {i} has no vectors, skipping")
continue
valid_docs.append(doc)
if len(valid_docs) == 0:
logging.warning("[FDE Batch] No valid documents after filtering")
return np.array([])
num_docs = len(valid_docs)
doc_embeddings_list = valid_docs
# Determine projection dimension (matching C++ logic)
use_identity_proj = config.projection_type == ProjectionType.DEFAULT_IDENTITY
if use_identity_proj:
projection_dim = config.dimension
logging.info(f"[FDE Batch] Using identity projection (dim={projection_dim})")
else:
if not config.projection_dimension or config.projection_dimension <= 0:
raise ValueError(
"A positive projection_dimension must be specified for non-identity projections"
)
projection_dim = config.projection_dimension
logging.info(
f"[FDE Batch] Using {config.projection_type.name} projection: "
f"{config.dimension} -> {projection_dim}"
)
# Configuration summary
num_partitions = 2**config.num_simhash_projections
logging.info(
f"[FDE Batch] Configuration: {config.num_repetitions} repetitions, "
f"{num_partitions} partitions, projection_dim={projection_dim}"
)
# Document tracking
doc_lengths = np.array([len(doc) for doc in doc_embeddings_list], dtype=np.int32)
total_vectors = np.sum(doc_lengths)
doc_boundaries = np.insert(np.cumsum(doc_lengths), 0, 0)
doc_indices = np.repeat(np.arange(num_docs), doc_lengths)
logging.info(
f"[FDE Batch] Total vectors: {total_vectors}, avg per doc: {total_vectors / num_docs:.1f}"
)
# Concatenate all embeddings
concat_start = time.perf_counter()
all_points = np.vstack(doc_embeddings_list).astype(np.float32)
concat_time = time.perf_counter() - concat_start
logging.info(f"[FDE Batch] Concatenation completed in {concat_time:.3f}s")
# Pre-allocate output
final_fde_dim = config.num_repetitions * num_partitions * projection_dim
out_fdes = np.zeros((num_docs, final_fde_dim), dtype=np.float32)
logging.info(f"[FDE Batch] Output FDE dimension: {final_fde_dim}")
# Process each repetition
for rep_num in range(config.num_repetitions):
# rep_start_time = time.perf_counter()
current_seed = config.seed + rep_num
if rep_num % 5 == 0: # Log every 5 repetitions
logging.info(
f"[FDE Batch] Processing repetition {rep_num + 1}/{config.num_repetitions}"
)
# Step 1: SimHash projection
simhash_start = time.perf_counter()
simhash_matrix = _simhash_matrix_from_seed(
config.dimension, config.num_simhash_projections, current_seed
)
all_sketches = all_points @ simhash_matrix
simhash_time = time.perf_counter() - simhash_start
# Step 2: Apply dimensionality reduction if configured
proj_start = time.perf_counter()
if use_identity_proj:
projected_points = all_points
elif config.projection_type == ProjectionType.AMS_SKETCH:
ams_matrix = _ams_projection_matrix_from_seed(
config.dimension, projection_dim, current_seed
)
projected_points = all_points @ ams_matrix
else:
raise ValueError(f"Unsupported projection type: {config.projection_type}")
proj_time = time.perf_counter() - proj_start
# Step 3: Vectorized partition index calculation
partition_start = time.perf_counter()
bits = (all_sketches > 0).astype(np.uint32)
partition_indices = np.zeros(total_vectors, dtype=np.uint32)
# Vectorized Gray Code computation
for bit_idx in range(config.num_simhash_projections):
partition_indices = (partition_indices << 1) + (
bits[:, bit_idx] ^ (partition_indices & 1)
)
partition_time = time.perf_counter() - partition_start
# Step 4: Vectorized aggregation
agg_start = time.perf_counter()
# Initialize storage for this repetition
rep_fde_sum = np.zeros(
(num_docs * num_partitions * projection_dim,), dtype=np.float32
)
partition_counts = np.zeros((num_docs, num_partitions), dtype=np.int32)
# Count vectors per partition per document
np.add.at(partition_counts, (doc_indices, partition_indices), 1)
# Aggregate vectors using flattened indexing for efficiency
doc_part_indices = doc_indices * num_partitions + partition_indices
base_indices = doc_part_indices * projection_dim
for d in range(projection_dim):
flat_indices = base_indices + d
np.add.at(rep_fde_sum, flat_indices, projected_points[:, d])
# Reshape for easier manipulation
rep_fde_sum = rep_fde_sum.reshape(num_docs, num_partitions, projection_dim)
agg_time = time.perf_counter() - agg_start
# Step 5: Convert sums to averages (for document FDE)
avg_start = time.perf_counter()
# Vectorized division where counts > 0
non_zero_mask = partition_counts > 0
counts_3d = partition_counts[:, :, np.newaxis] # Broadcasting for division
# Safe division (avoid divide by zero)
np.divide(rep_fde_sum, counts_3d, out=rep_fde_sum, where=counts_3d > 0)
# Fill empty partitions if configured
empty_filled = 0
if config.fill_empty_partitions:
empty_mask = ~non_zero_mask
empty_docs, empty_parts = np.where(empty_mask)
for doc_idx, part_idx in zip(empty_docs, empty_parts):
if doc_lengths[doc_idx] == 0:
continue
# Get sketches for this document
doc_start = doc_boundaries[doc_idx]
doc_end = doc_boundaries[doc_idx + 1]
doc_sketches = all_sketches[doc_start:doc_end]
# Vectorized distance calculation
binary_rep = _gray_code_to_binary(part_idx)
target_bits = (
binary_rep >> np.arange(config.num_simhash_projections - 1, -1, -1)
) & 1
distances = np.sum(
(doc_sketches > 0).astype(int) != target_bits, axis=1
)
nearest_local_idx = np.argmin(distances)
nearest_global_idx = doc_start + nearest_local_idx
rep_fde_sum[doc_idx, part_idx, :] = projected_points[nearest_global_idx]
empty_filled += 1
avg_time = time.perf_counter() - avg_start
# Step 6: Copy results to output array
rep_output_start = rep_num * num_partitions * projection_dim
out_fdes[
:, rep_output_start : rep_output_start + num_partitions * projection_dim
] = rep_fde_sum.reshape(num_docs, -1)
# Log timing for first repetition
if rep_num == 0:
logging.info("[FDE Batch] Repetition timing breakdown:")
logging.info(f" - SimHash: {simhash_time:.3f}s")
logging.info(f" - Projection: {proj_time:.3f}s")
logging.info(f" - Partition indices: {partition_time:.3f}s")
logging.info(f" - Aggregation: {agg_time:.3f}s")
logging.info(f" - Averaging: {avg_time:.3f}s")
if config.fill_empty_partitions:
logging.info(f" - Filled {empty_filled} empty partitions")
# Step 7: Apply final projection if configured
if config.final_projection_dimension and config.final_projection_dimension > 0:
logging.info(
f"[FDE Batch] Applying final projection: {final_fde_dim} -> "
f"{config.final_projection_dimension}"
)
final_proj_start = time.perf_counter()
# Process in chunks to avoid memory issues
chunk_size = min(100, num_docs)
final_fdes = []
for i in range(0, num_docs, chunk_size):
chunk_end = min(i + chunk_size, num_docs)
chunk_fdes = np.array(
[
_apply_count_sketch_to_vector(
out_fdes[j], config.final_projection_dimension, config.seed
)
for j in range(i, chunk_end)
]
)
final_fdes.append(chunk_fdes)
out_fdes = np.vstack(final_fdes)
final_proj_time = time.perf_counter() - final_proj_start
logging.info(
f"[FDE Batch] Final projection completed in {final_proj_time:.3f}s"
)
# Final statistics and validation
total_time = time.perf_counter() - batch_start_time
logging.info(f"[FDE Batch] Batch generation completed in {total_time:.3f}s")
logging.info(
f"[FDE Batch] Average time per document: {total_time / num_docs * 1000:.2f}ms"
)
logging.info(f"[FDE Batch] Throughput: {num_docs / total_time:.1f} docs/sec")
logging.info(f"[FDE Batch] Output shape: {out_fdes.shape}")
# Validate output dimensions
expected_dim = (
final_fde_dim
if not config.final_projection_dimension
else config.final_projection_dimension
)
assert out_fdes.shape == (num_docs, expected_dim), (
f"Output shape mismatch: {out_fdes.shape} != ({num_docs}, {expected_dim})"
)
# doc_config = replace(config, encoding_type=EncodingType.AVERAGE)
return out_fdes
if __name__ == "__main__":
print(f"\n{'=' * 20} SCENARIO 1: Basic FDE Generation {'=' * 20}")
base_config = FixedDimensionalEncodingConfig(
dimensinotallow=128, num_repetitinotallow=2, num_simhash_projectinotallow=4, seed=42
)
query_data = np.random.randn(32, base_config.dimension).astype(np.float32)
doc_data = np.random.randn(80, base_config.dimension).astype(np.float32)
query_fde = generate_query_fde(query_data, base_config)
doc_fde = generate_document_fde(
doc_data, replace(base_config, fill_empty_partitinotallow=True)
)
expected_dim = (
base_config.num_repetitions
* (2**base_config.num_simhash_projections)
* base_config.dimension
)
print(f"Query FDE Shape: {query_fde.shape} (Expected: {expected_dim})")
print(f"Document FDE Shape: {doc_fde.shape} (Expected: {expected_dim})")
print(f"Similarity Score: {np.dot(query_fde, doc_fde):.4f}")
assert query_fde.shape[0] == expected_dim
print(f"\n{'=' * 20} SCENARIO 2: Inner Projection (AMS Sketch) {'=' * 20}")
ams_config = replace(
base_config, projection_type=ProjectionType.AMS_SKETCH, projection_dimensinotallow=16
)
query_fde_ams = generate_query_fde(query_data, ams_config)
expected_dim_ams = (
ams_config.num_repetitions
* (2**ams_config.num_simhash_projections)
* ams_config.projection_dimension
)
print(f"AMS Sketch FDE Shape: {query_fde_ams.shape} (Expected: {expected_dim_ams})")
assert query_fde_ams.shape[0] == expected_dim_ams
print(f"\n{'=' * 20} SCENARIO 3: Final Projection (Count Sketch) {'=' * 20}")
final_proj_config = replace(base_config, final_projection_dimensinotallow=1024)
query_fde_final = generate_query_fde(query_data, final_proj_config)
print(
f"Final Projection FDE Shape: {query_fde_final.shape} (Expected: {final_proj_config.final_projection_dimension})"
)
assert query_fde_final.shape[0] == final_proj_config.final_projection_dimension
print(f"\n{'=' * 20} SCENARIO 4: Top-level `generate_fde` wrapper {'=' * 20}")
query_fde_2 = generate_fde(
query_data, replace(base_config, encoding_type=EncodingType.DEFAULT_SUM)
)
doc_fde_2 = generate_fde(
doc_data, replace(base_config, encoding_type=EncodingType.AVERAGE)
)
print(
f"Wrapper-generated Query FDE is identical: {np.allclose(query_fde, query_fde_2)}"
)
print(
f"Wrapper-generated Document FDE is identical: {np.allclose(doc_fde, doc_fde_2)}"
)
print("\nAll test scenarios completed successfully.")結(jié)語(yǔ)
隨著 ColBERT、ColPali 等多向量模型的進(jìn)一步發(fā)展,以及 MUVERA 這類(lèi)優(yōu)化算法的不斷演進(jìn),多向量檢索的效率瓶頸正在逐步被克服。未來(lái),在推薦系統(tǒng)、搜索引擎、文檔檢索等場(chǎng)景中,多向量技術(shù)很可能成為標(biāo)準(zhǔn)配置。而 MUVERA 所展示的將復(fù)雜問(wèn)題簡(jiǎn)化為經(jīng)典問(wèn)題的思路,也為其他領(lǐng)域的算法優(yōu)化提供了有價(jià)值的參考。




























