精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

LSM-TREE從入門到入魔:從零開始實現一個高性能鍵值存儲

存儲 存儲架構
我們使用Zig語言實現了一個LSM-Tree的核心功能,包括MemTable、SSTable、寫流程、各類Iterator與數據壓縮能力。通過這個項目,我收獲了很多心得體會。

目錄

一、引言

二、LSM-Treee 核心功能概述

三、核心功能實現

    1.MemTable 實現

    2.SSTable

    3.Write

    4.Iterators

    5.Read/Scan

    6.壓縮

四、總結

一.引 言

LSM-Tree(Log-Structured Merge Tree)是一種高效的鍵值存儲數據結構,廣泛應用于NoSQL數據庫和大數據處理系統中。其核心思想是通過分層、有序地利用磁盤順序寫入的性能優勢,優化寫入操作,同時犧牲部分讀取性能以換取更高的寫入吞吐量。

圖片圖片

圖片圖片

在互聯網的各個基礎設施中,不論是數據庫還是緩存亦或是大數據框架,LSM-Tree這個數據結構都是很常見的身影。

我每天都在使用這個存儲引擎,但是對它的了解還流于表面,所以我想要自己實現一次LSM-Tree加深理解。

本次實現我們采用了Zig語言,簡要的實現LSM-Tree的核心功能(讀寫、數據壓縮、持久化,不包含MVCC的內容)。

Zig是一種新興的系統編程語言,其設計目標是提供現代特性的同時保持低復雜性。

本項目極大的受到了Mini-Lsm這個項目的啟發,強烈推薦大家學習這個項目!

二.LSM-Treee 核心功能概述

在開始自己編寫之前,我先簡單介紹一下LSM-Tree(Log-Structured Merge Tree)的架構以及讀寫流程。

LSM-Tree它結合了日志和索引的特點,優化了寫入和讀取性能。每次寫入都是采用append-only的方式,所以寫入性能很高。

而作為代價,追加寫入會造成存儲放大,LSM-Tree時采用了多層SSTable的方式將數據堆疊在硬盤上。所以需要一個合并壓縮的過程來回收過多的空間。

圖片圖片

寫流程


讀流程


  • 預寫日志(WAL) :寫操作首先寫入預寫日志(WAL),用于記錄未提交的數據,確保數據的持久性和一致性。
  • MemTable:隨后將數據寫入內存中的MemTable,MemTable是一個平衡樹(如skiplist),支持快速插入和刪除操作。
  • 觸發Compaction:當MemTable達到一定閾值時,會觸發后臺線程將MemTable中的數據刷入磁盤,生成SSTable文件。
  • SSTable:生成的SSTable文件是不可變的,存儲在磁盤上,用于后續讀取操作。
  • 合并操作(Merge) :當多個SSTable文件達到一定數量時,會觸發合并操作,將它們合并為一個更大的SSTable文件,以減少文件數量。
  • MemTable優先:讀取操作首先從MemTable中查找數據,因為MemTable是按升序排列的,查找效率較高。
  • Block Cache:如果MemTable中未找到數據,則從Block Cache中查找。Block Cache存儲了預先加載到內存中的SSTable塊,以提高讀取性能。
  • SSTable查找:如果Block Cache中也未找到數據,則從磁盤上的SSTable文件中查找。Lsm-tree會從最低層(L0)開始查找,逐層向上查找,直到找到目標數據。
  • 多版本并發控制(MVCC) :Lsm-tree支持多版本并發控制,允許同時訪問不同版本的數據,從而提高并發性能。

三、核心功能實現

MemTable 實現

首先,我們先實現 LSM 存儲引擎的內存結構—Memtable。我們選擇跳表實現作為 Memtable 的數據結構,因為它支持無鎖的并發讀寫。我們不會深入介紹跳表的工作原理(Redis的同學應該不陌生這個東西),簡單來說,它是一個易于實現的有序鍵值映射。

圖片圖片

Skiplist的實現非常簡單,這里我利用Zig編譯時的能力實現了一個泛型版本的跳表src/skiplist.zig,有興趣的小伙伴可以直接去倉庫中參觀代碼。

基于SkipList的能力,我們即可包裝出Memtable的基本功能。

我們這個LSM支持WAL功能的,即寫入內存表之前要先寫入磁盤日志,方便在意外宕機重啟后可以恢復數據。

WAL的能力我就不想自己再實現了,于是從網上扒了一個C的實現(Zig集成C語言非常便捷,可以參考與 C 交互)。

map: Map,
lock: RwLock,
wal: ?Wal,
id: usize,
allocator: std.mem.Allocator,
arena: std.heap.ArenaAllocator,
approximate_size: atomic.Value(usize) = atomic.Value(usize).init(0),


fn putToList(self: *Self, key: []const u8, value: []const u8) !void {
    {
        self.lock.lock();
        defer self.lock.unlock();
        try self.map.insert(kk, vv);
    }


    _ = self.approximate_size.fetchAdd(@intCast(key.len + value.len), .monotonic);
}


fn putToWal(self: *Self, key: []const u8, value: []const u8) !void {
    // [key-size: 4bytes][key][value-size: 4bytes][value]


    if (self.wal) |w| {
        var buf = std.ArrayList(u8).init(self.arena.allocator());


        var bw = buf.writer();
        try bw.writeInt(u32, @intCast(key.len), .big);
        _ = try bw.write(key);
        try bw.writeInt(u32, @intCast(value.len), .big);
        _ = try bw.write(value);
        try w.append(buf.items);
    }
}


// 寫入Memtable,先寫WAL,再寫skiplist table
pub fn put(self: *Self, key: []const u8, value: []const u8) !void {
    try self.putToWal(key, value);
    try self.putToList(key, value);
}


pub fn get(self: *Self, key: []const u8, val: *[]const u8) !bool {
    self.lock.lockShared();
    defer self.lock.unlockShared();
    var vv: []const u8 = undefined;
    if (try self.map.get(key, &vv)) {
        val.* = vv;
        return true;
    }
    return false;
}

注意到這里我們沒有實現刪除的功能,這里我仿照了RocksDB中的墓碑機制,用空值代表刪除,所以刪除被put(key, "")取代。

SSTable

接下來,我們就著手開始實現LSM中另外一個重要元素 --- SSTable。

SSTable(Sorted String Table)是一種不可變的(Immutable)磁盤文件,內部按Key有序排列,存儲鍵值對數據。每個SSTable文件生成后不再修改,更新和刪除操作通過追加新記錄或標記刪除,最終通過合并(Compaction)清理冗余數據。

每當LSM-Tree中的MemTable體積超出閾值,就會將內存中的數據寫入SsTable。

圖片圖片

每個SSTable由多個Block組成,每個Block是一組KV的package。

Block的編碼格式如下:

圖片圖片

為了構建一個Block,我們實現了一個BlockBuilder的模塊,這部分代碼見src/block.zig:

pub const Block = struct {
    data_v: std.ArrayList(u8),
    offset_v: std.ArrayList(u16),
}


pub const BlockBuilder = struct {
    allocator: std.mem.Allocator,
    offset_v: std.ArrayList(u16),
    data_v: std.ArrayList(u8),
    block_size: usize,
    first_key: []u8,
    
    pub fn add(self: *Self, key: []const u8, value: ?[]const u8) !bool {
        std.debug.assert(key.len > 0); // key must not be empty


        const vSize = if (value) |v| v.len else 0;
        
        if ((self.estimated_size() + key.len + vSize + 3 * @sizeOf(u16) > self.block_size) and !self.is_empty()) {
            return false;
        }
        try self.doAdd(key, value);


        if (self.first_key.len == 0) {
            self.first_key = try self.allocator.dupe(u8, key);
        }
        return true;
    }


    fn doAdd(self: *Self, key: []const u8, value: ?[]const u8) !void {
        // add the offset of the data into the offset array
        try self.offset_v.append(@intCast(self.data_v.items.len));
        const overlap = calculate_overlap(self.first_key, key);


        var dw = self.data_v.writer();
        // encode key overlap
        try dw.writeInt(u16, @intCast(overlap), .big);
        // encode key length
        try dw.writeInt(u16, @intCast(key.len - overlap), .big);


        // encode key content
        _ = try dw.write(key[overlap..]);
        // encode value length
        if (value) |v| {
            try dw.writeInt(u16, @intCast(v.len), .big);
            // encode value content
            _ = try dw.write(v);
        } else {
            try dw.writeInt(u16, 0, .big);
        }
    }


    pub fn build(self: *Self) !Block {
        if (self.isEmpty()) {
            @panic("block is empty");
        }
        return Block.init(
            try self.data_v.clone(),
            try self.offset_v.clone(),
        );
    }
}

可能有同學注意到,我們寫key的時候沒有直接將key值寫入,而且只寫了key與當前block的第一個key不重疊的suffix部分。由于block中的key都是有序的,所以一個block中的key有很大概率是前綴類似的,所以這里是一個空間優化的小技巧,例如:

Key: foo, foo1, foo2, foo3 ....

我們寫入block時,只需要寫:

foo|1|2|3|....

很多有序表的實現中都會用到這個小技巧。

有了block的實現,我們可以進一步來定義SSTable的格式。一個SSTable由多個Block、block元數據以及布隆過濾器構成。

圖片圖片

布隆過濾器是一種概率性數據結構,用于維護一組鍵。您可以向布隆過濾器中添加鍵,并且可以知道在添加到布隆過濾器中的鍵集中可能存在或必須不存在的鍵。

在SSTable中添加布隆過濾器可以有效提升查詢key的效率。

元數據包含了block的第一個與最后一個key以及block在sst中的offset信息,記錄元數據主要為了在后續的檢索中可以快速定位某個key落在哪個block中。

同樣的套路,為了構建SSTable,我們先實現一個SSTableBuilder,部分代碼見src/ss_table.zig

pub const SsTableBuilder = struct {
    allocator: std.mem.Allocator,
    builder: BlockBuilder, // 剛才實現的block構建裝置
    first_key: ?[]const u8,
    last_key: ?[]const u8,
    meta: std.ArrayList(BlockMeta),
    block_size: usize,
    data: std.ArrayList(u8),
    bloom: BloomFilterPtr, // 布隆過濾器
    
    pub fn add(self: *Self, key: []const u8, value: []const u8) !void {
        try self.setFirstKey(key);
        try self.bloom.get().insert(key); // 寫入布隆過濾器


        if (try self.builder.add(key, value)) {
            try self.setLastKey(key);
            return;
        }
        // block is full
        try self.finishBlock();
        std.debug.assert(try self.builder.add(key, value));
        try self.resetFirstKey(key);
        try self.setLastKey(key);
    }
    
    // 寫入一個block的數據
    fn finishBlock(self: *Self) !void {
        if (self.builder.isEmpty()) {
            return;
        }
        var bo = self.builder;
        // reset block
        defer bo.reset();


        self.builder = BlockBuilder.init(self.allocator, self.block_size);
        var blk = try bo.build();
        defer blk.deinit();
        const encoded_block = try blk.encode(self.allocator); // block序列化
        defer self.allocator.free(encoded_block);
        
        // 記錄block的元數據
        try self.meta.append(.{
            .allocator = self.allocator,
            .offset = self.data.items.len,
            .first_key = try self.allocator.dupe(u8, self.first_key.?),
            .last_key = try self.allocator.dupe(u8, self.last_key.?),
        });
        const cksm = hash.Crc32.hash(encoded_block); // 寫入4b的校驗值
        try self.data.appendSlice(encoded_block);
        try self.data.writer().writeInt(u32, cksm, .big);
    }
    
    // 構建為一個SSTable
    pub fn build(
        self: *Self,
        id: usize,
        block_cache: ?BlockCachePtr, // 讀取block數據的緩存,減少block的反序列化成本
        path: []const u8,
    ) !SsTable {
        var arena = std.heap.ArenaAllocator.init(self.allocator);
        defer arena.deinit();
        const allocator = arena.allocator();


        try self.finishBlock();
        const w = self.data.writer();
        
        // 寫入元數據及其offset
        const meta_offset = self.data.items.len;
        const meta_b = try BlockMeta.batchEncode(self.meta.items, allocator);
        _ = try w.write(meta_b);
        try w.writeInt(u32, @intCast(meta_offset), .big);


        // 寫入布隆過濾器及其offset
        const bloom_offset = self.data.items.len;
        const encoded_bloom = try self.bloom.get().encode(allocator);
        _ = try w.write(encoded_bloom);
        try w.writeInt(u32, @intCast(bloom_offset), .big);
        
        
        const file = try FileObject.init(path, self.data.items);
        errdefer file.deinit();


        const fk = self.meta.items[0].first_key;
        const lk = self.meta.getLast().last_key;


        return .{
            .allocator = self.allocator,
            .file = file,
            .block_metas = try self.meta.toOwnedSlice(),
            .meta_offset = meta_offset,
            .block_cache = block_cache,
            .bloom = self.bloom.clone(),
            .id = id,
            .first_key = try self.allocator.dupe(u8, fk),
            .last_key = try self.allocator.dupe(u8, lk),
            .max_ts = 0,
        };
    }
}

Write

有了SSTable和MemTable,我們就有了LSM-Tree需要的兩個最重要的材料,后續的讀寫不過是對這兩類材料的組合拼裝。

在實現寫操作之前,我們先假想一下LSM-Tree的數據結構:

  • 首先我們需要一個數據結構存儲當前MemTable、冷MemTables和多層的SST,如下圖所示。

圖片圖片

  • 其次我們需要一個鎖用于同步上述數據結構的讀寫行為。
  • 我們還需要一個SSTable的自增id。
  • 最后還需要一些必要的配置,例如存儲路徑、線程管理器等。

最終,我們實現的LSM數據結構如下:

pub const StorageState = struct {
    allocator: std.mem.Allocator,
    mem_table: MemTablePtr, // 當前正在寫的MemTable
    imm_mem_tables: std.ArrayList(MemTablePtr), // 冷MemTable數組
    l0_sstables: std.ArrayList(usize), // 第一層的SSTable數組
    levels: std.ArrayList(std.ArrayList(usize)), // 后續多層的SSTable數組
    sstables: std.AutoHashMap(usize, SsTablePtr), // sst_id => SSTable
}


pub const StorageInner = struct {
    const Self = @This();


    allocator: std.mem.Allocator,
    state: StorageState,
    state_lock: std.Thread.RwLock = .{},
    next_sst_id: atomic.Value(usize),
    path: []const u8,
    options: StorageOptions,
    compaction_controller: CompactionController,
    block_cache: BlockCachePtr,
    terminate: std.Thread.ResetEvent = .{},
    wg: std.Thread.WaitGroup = .{},
}

先不考慮逐層壓縮的邏輯,只考慮一層SSTable的簡單情況,寫邏輯可以簡化為如下流程:

圖片圖片

  • 寫入State中的MemTable
pub fn writeBatch(self: *Self, records: []const WriteBatchRecord) !void {
    for (records) |record| {
        switch (record) {
            .put => |pp| {
                try self.state.getMemTable().put(pp.key, pp.value);
            },
            .delete => |dd| {
                // we use "" as the tombstone value
                try self.state.getMemTable().put(dd, "");
            },
        }
        // 嘗試把當前MemTable壓入冷數據
        try self.tryFreeze(self.state.getMemTable().getApproximateSize());
    }
}
  • 當MemTable體積超出閾值,壓入冷MemTable數組,重置當前MemTable
fn forceFreezeMemtable(self: *Self) !void {
    const next_sst_id = self.getNextSstId();
    
    // 生成一個新的MemTable
    var new_mm: MemTable = undefined;
    {
        if (self.options.enable_wal) {
            const mm_path = try pathOfWal(self.allocator, self.path, next_sst_id);
            defer self.allocator.free(mm_path);
            new_mm = MemTable.init(next_sst_id, self.allocator, mm_path);
        } else {
            new_mm = MemTable.init(next_sst_id, self.allocator, null);
        }
    }
    errdefer new_mm.deinit();


    var old_mm: *MemTable = undefined;
    {
        self.state_lock.lock();
        defer self.state_lock.unlock();
        var old_mm_ptr = self.state.mem_table;
        old_mm = old_mm_ptr.get();
        defer old_mm_ptr.release();
        self.state.mem_table = try MemTablePtr.create(self.allocator, new_mm);
        
        // 將寫滿的MemTable壓入冷數據
        try self.state.imm_mem_tables.append(old_mm_ptr.clone()); // newer memtable is inserted at the end
    }
    // TIPS:把磁盤同步放在鎖的范圍外面,降低鎖的覆蓋
    try old_mm.syncWal();
}
  • 當冷MemTable數組大小超出配置閾值,觸發SSTable落盤,彈出最冷的MemTable,寫入磁盤SSTable,并記錄在L0的SSTable數組中。這一過程是在一個線程中定時觸發
pub fn flushNextMemtable(self: *Self) !void {
    std.debug.assert(self.state.imm_mem_tables.items.len > 0);
    var to_flush_table: *MemTable = undefined;
    {
        self.state_lock.lockShared();
        defer self.state_lock.unlockShared();
        // oldest memtable is at the index 0
        to_flush_table = self.state.imm_mem_tables.items[0].load();
    }


    // 將最冷的MemTable構建為SSTable
    var builder = try SsTableBuilder.init(self.allocator, self.options.block_size);
    defer builder.deinit();


    const sst_id = to_flush_table.id;
    try to_flush_table.flush(&builder);


    const sst_path = try self.pathOfSst(sst_id);
    defer self.allocator.free(sst_path);
    var sst = try builder.build(sst_id, self.block_cache.clone(), sst_path);
    errdefer sst.deinit();


    // add the flushed table to l0_sstables
    {
        self.state_lock.lock();
        defer self.state_lock.unlock();


        var m = self.state.imm_mem_tables.orderedRemove(0);
        defer m.deinit();
        std.debug.assert(m.load().id == sst_id);


        // newest sstable is at the end
        try self.state.l0_sstables.append(sst_id);
        try self.state.sstables.put(sst.id, try SsTablePtr.create(self.allocator, sst));
    }
}

當然,這里只實現了一半的寫邏輯,數據停留在L0的SST中,后續的多層SST還沒有使用。

剩下一半的寫邏輯會在數據壓縮的章節中介紹。

Iterators

寫入的過程比較好理解,但是讀就略微復雜了,以上面我們實現的寫結果為例子,最終我們的數據沉淀在一個3層的數據結構中,要如何高效的從其中檢索數據呢?

圖片圖片

如同寫過程一般,讀過程也是對各個基礎單元(MemTable、SSTable、Block)讀過程的組合,為了方便組合邏輯,我們要先統一各個模塊的讀行為。

在LSM-Tree中,所有的讀行為都定義為了如下的Interface(Zig中沒trait或者Interface,所以這里實例代碼我用Rust描述):

pub trait StorageIterator {
    /// Get the current value.
    fn value(&self) -> &[u8];


    /// Get the current key.
    fn key(&self) -> &[u8];


    /// Check if the current iterator is empty.
    fn is_empty(&self) -> bool;


    /// Move to the next position.
    fn next(&mut self) -> anyhow::Result<()>;


    /// Number of underlying active iterators for this iterator.
    fn num_active_iterators(&self) -> usize {
        1
    }
}

我們首先對MemTable、SSTable、Block這些模塊實現讀接口,代碼可見:src/MemTable.zig,src/block.zig,src/ss_table.zig,這里單獨簡單介紹下SSTable的讀接口實現思路,其他的模塊實現思路類似,感興趣的直接閱讀源碼即可。

pub const SsTableIterator = struct {
    allocator: std.mem.Allocator,
    table: SsTablePtr,
    blk: BlockPtr,
    blk_iterator: BlockIteratorPtr,
    blk_idx: usize,


    const Self = @This();




    pub fn initAndSeekToFirst(allocator: std.mem.Allocator, table: SsTablePtr) !Self {
        const s = try seekToFirstInner(allocator, table);
        return .{
            .allocator = allocator,
            .table = table,
            .blk_iterator = s.blk_iter,
            .blk = s.blk,
            .blk_idx = 0,
        };
    }


    pub fn initAndSeekToKey(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !Self {
        const b = try seekToKeyInner(allocator, table, k);
        return .{
            .allocator = allocator,
            .table = table,
            .blk_iterator = b.blk_iter,
            .blk_idx = b.blk_idx,
            .blk = b.blk,
        };
    }


    fn seekToFirstInner(allocator: std.mem.Allocator, table: SsTablePtr) !struct {
        blk: BlockPtr,
        blk_iter: BlockIteratorPtr,
    } {
        var blk = try table.get().readBlockCached(0, allocator); // 讀取第一個block
        errdefer blk.release();
        var blk_iter = try BlockIterator.createAndSeekToFirst(allocator, blk.clone());
        errdefer blk_iter.deinit();


        return .{
            .blk = blk,
            .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter), // 從SSTable的讀接口轉換為Block的讀接口
        };
    }


    fn seekToKeyInner(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !struct {
        blk_idx: usize,
        blk: BlockPtr,
        blk_iter: BlockIteratorPtr,
    } {
        const table_ptr = table.get();
        var blk_idx = try table_ptr.findBlockIndex(k);
        var blk = try table_ptr.readBlockCached(blk_idx, allocator);
        errdefer blk.deinit();
        var blk_iter = try BlockIterator.createAndSeekToKey(allocator, blk.clone(), k);
        errdefer blk_iter.deinit();
        var blk_iter_ptr = try BlockIteratorPtr.create(allocator, blk_iter);
        errdefer blk_iter_ptr.release();


        // 如果當前block讀完了,跳到下一個block,并生成block的讀接口
        if (blk_iter.isEmpty()) {
            blk_idx += 1;
            if (blk_idx < table_ptr.numBlocks()) {
                {
                    blk.deinit();
                    blk_iter.deinit();
                }
                var blk2 = try table_ptr.readBlockCached(blk_idx, allocator);
                errdefer blk2.deinit();
                var blk_iter2 = try BlockIterator.createAndSeekToFirst(allocator, blk2.clone());
                errdefer blk_iter2.deinit();


                return .{
                    .blk_idx = blk_idx,
                    .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter2),
                    .blk = blk2,
                };
            }
        }
        return .{
            .blk_idx = blk_idx,
            .blk_iter = blk_iter_ptr,
            .blk = blk,
        };
    }


    pub fn key(self: Self) []const u8 {
        return self.blk_iterator.get().key();
    }


    pub fn value(self: Self) []const u8 {
        return self.blk_iterator.get().value();
    }


    pub fn isEmpty(self: Self) bool {
        return self.blk_iterator.get().isEmpty();
    }


    pub fn next(self: *Self) !void {
        try self.blk_iterator.get().next();
        // 若當前的Block讀完了,就跳到下一個block,并生成Block讀接口。
        if (self.blk_iterator.get().isEmpty()) {
            self.blk_idx += 1;
            if (self.blk_idx < self.table.get().numBlocks()) {
                self.reset();
                const blk = try self.table.get().readBlockCached(self.blk_idx, self.allocator);
                const blk_iter = try BlockIterator.createAndSeekToFirst(self.allocator, blk.clone());
                self.blk = blk;
                self.blk_iterator = try BlockIteratorPtr.create(self.allocator, blk_iter);
            }
        }
    }
};

有了幾個基本元素的讀接口之后,我們便遇到第一個問題:我們如何對多個MemTable做讀檢索?

圖片圖片

這個時候,我們需要一個新的數據結構來實現多個讀實例的合并檢索---- MergeIterator

MergeIterator在內部維護一個二叉堆。堆中數據的優先級如下:

當各個迭代器key不同時,具有最小key的迭代器最優。當多個迭代器有相同的當前key時,最新的迭代器一個最優。

假設我們有如下MemTable(iter1最新,iter3最舊):

  • iter1: b->del, c->4, d->5
  • iter2: a->1, b->2, c->3
  • iter3: e->4

經過合并后迭代器結果應該為:

  • a最小,iter2優先迭代
  • iter2迭代一次后,iter1與iter2 key相同,iter1優先迭代,b->2跳過
  • c最小,iter1優先迭代,iter2中c->3跳過
  • d最小,iter1優先迭代
  • 只剩iter3,迭代iter3

最終結果:a->1, b->del, c->4, d->5, e->4

實現代碼如下:

// 標準庫中有二叉堆實現
const IteratorHeap = std.PriorityQueue(*HeapWrapper, Comparer.Context, Comparer.cmp);


allocator: std.mem.Allocator,
q: IteratorHeap,
current: ?*HeapWrapper,


pub fn init(allocator: std.mem.Allocator, iters: std.ArrayList(StorageIteratorPtr)) !Self {
    var q = IteratorHeap.init(allocator, .{});
    if (iters.items.len == 0) {
        return Self{
            .allocator = allocator,
            .q = q,
            .current = null,
        };
    }


    // PS: the last iter has the highest priority
    // 按順序寫入二叉堆
    for (iters.items, 0..) |sp, i| {
        if (!sp.load().isEmpty()) {
            const hw = try allocator.create(HeapWrapper);
            errdefer allocator.destroy(hw);
            hw.* = HeapWrapper.init(i, sp.clone());
            try q.add(hw);
        }
    }


    const cc = q.removeOrNull();
    return Self{
        .allocator = allocator,
        .q = q,
        .current = cc,
    };
}


pub fn key(self: Self) []const u8 {
    return self.current.?.key();
}


pub fn value(self: Self) []const u8 {
    return self.current.?.value();
}


pub fn isEmpty(self: Self) bool {
    if (self.current) |cc| {
        return cc.isEmpty();
    }
    return true;
}


pub fn next(self: *Self) !void {
    const cc = self.current.?;
    while (true) {
        if (self.q.peek()) |ii| {
            std.debug.assert(!ii.isEmpty());
            // 如果優先堆頭部迭代器A和當前正在生效的迭代器B的key相同,讓迭代器A跳過重復key
            if (std.mem.eql(u8, cc.key(), ii.key())) {
                try ii.next();
                if (ii.isEmpty()) {
                    _ = self.q.remove();
                    ii.deinit();
                    self.allocator.destroy(ii);
                }
            } else {
                break;
            }
        }
        break;
    }


    try cc.next(); // 迭代當前迭代器


    // 如果當前優先迭代器迭代完了,就從堆中彈出最優迭代器
    if (cc.isEmpty()) {
        defer {
            cc.deinit();
            self.allocator.destroy(cc);
        }
        if (self.q.removeOrNull()) |h| {
            self.current = h;
        } else {
            self.current = null;
        }
        return;
    }


    // 將當前迭代器寫回二叉堆,重新計算最優迭代器
    try self.q.add(cc); 
    self.current = self.q.removeOrNull();
}

有了MergeIterator這個工具,我們具備了在多個MemTable和多個SSTable中迭代檢索的能力,但是還有個問題,我們當前有兩個MergeIterator,應該如何在兩個迭代器中執行迭代任務?

圖片圖片

此時,我們再引入一個新的數據結構:TwoMergeIterator,這個是MergeIterator在元素只有兩個的情況下的簡化版。

TwoMergeIterator由兩個迭代器構成,一個高優一個低優,每次迭代優先迭代高優,當key相同時,優先迭代高優。實現如下:

pub const TwoMergeIterator = struct {
    a: StorageIteratorPtr,
    b: StorageIteratorPtr,
    choose_a: bool,


    // 選擇兩個迭代器中key更小的迭代器
    fn chooseA(a: *StorageIterator, b: *StorageIterator) bool {
        if (a.isEmpty()) {
            return false;
        }
        if (b.isEmpty()) {
            return true;
        }
        return std.mem.lessThan(u8, a.key(), b.key());
    }


    // key相同時,跳過低優中的key
    fn skipB(self: *TwoMergeIterator) !void {
        const ap = self.a.load();
        const bp = self.b.load();
        if (!ap.isEmpty() and !bp.isEmpty() and std.mem.eql(u8, ap.key(), bp.key())) try bp.next();
    }


    pub fn init(a: StorageIteratorPtr, b: StorageIteratorPtr) !TwoMergeIterator {
        var iter = TwoMergeIterator{
            .a = a,
            .b = b,
            .choose_a = false,
        };
        try iter.skipB();
        iter.choose_a = chooseA(iter.a.load(), iter.b.load());
        return iter;
    }


    pub fn deinit(self: *TwoMergeIterator) void {
        self.a.release();
        self.b.release();
    }


    pub fn key(self: TwoMergeIterator) []const u8 {
        if (self.choose_a) {
            std.debug.assert(!self.a.load().isEmpty());
            return self.a.load().key();
        }
        std.debug.assert(!self.b.load().isEmpty());
        return self.b.load().key();
    }


    pub fn value(self: TwoMergeIterator) []const u8 {
        if (self.choose_a) {
            std.debug.assert(!self.a.load().isEmpty());
            return self.a.load().value();
        }
        std.debug.assert(!self.b.load().isEmpty());
        return self.b.load().value();
    }


    pub fn isEmpty(self: TwoMergeIterator) bool {
        if (self.choose_a) {
            return self.a.load().isEmpty();
        }
        return self.b.load().isEmpty();
    }


    pub fn next(self: *TwoMergeIterator) !void {
        if (self.choose_a) {
            try self.a.load().next();
        } else {
            try self.b.load().next();
        }
        try self.skipB();
        self.choose_a = chooseA(self.a.load(), self.b.load());
    }
};

至此,我們讀行為所需要的武器就完備了!

Read/Scan

讓我們再來看看LSM的架構圖:

圖片

我們將每個數據層中的數據標上優先級,由于LSM-Tree是append-only的,所以優先級越高的數據層中數據越新。

所以我們的讀策略也很明顯:按照上圖中P0至P2依次檢索,這部分代碼實現見src/storage.zig。

  • 讀MemTable
// search in memtable
if (try self.state.getMemTable().get(key, value)) {
    if (value.*.len == 0) {
        // tomestone
        return false;
    }
    return true;
}
  • 讀Immutable MemTable
// search in imm_memtables


self.state_lock.lockShared();
defer self.state_lock.unlockShared();
for (self.state.imm_mem_tables.items) |imm_table| {
    if (try imm_table.load().get(key, value)) {
        if (value.*.len == 0) {
            // tomestone
            return false;
        }
        return true;
    }
}
  • 讀LV0~LVmax SSTables
// 收集L0中的迭代器
var l0_iters = std.ArrayList(StorageIteratorPtr).init(self.allocator);
defer {
    for (l0_iters.items) |iter| {
        var ii = iter;
        ii.release();
    }
    l0_iters.deinit();
}
{
    self.state_lock.lockShared();
    defer self.state_lock.unlockShared();
    for (self.state.l0_sstables.items) |sst_id| {
        const sst = self.state.sstables.get(sst_id).?;
        if (try sst.load().mayContain(key)) {
            var ss_iter = try SsTableIterator.initAndSeekToKey(self.allocator, sst.clone(), key);
            errdefer ss_iter.deinit();
            try l0_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .ss_table_iter = ss_iter }));
        }
    }
}


// 收集Levels中的迭代器
var level_iters: std.ArrayList(StorageIteratorPtr) = undefined;
{
    self.state_lock.lockShared();
    defer self.state_lock.unlockShared();
    level_iters = try std.ArrayList(StorageIteratorPtr).initCapacity(
        self.allocator,
        self.state.levels.items.len,
    );
    for (self.state.levels.items) |level| {
        var level_ssts = try std.ArrayList(SsTablePtr).initCapacity(self.allocator, level.items.len);
        errdefer level_ssts.deinit();
        for (level.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            if (try mayWithinTable(key, sst)) {
                try level_ssts.append(sst.clone());
            }
        }
        if (level_ssts.items.len > 0) {
            var level_iter = try SstConcatIterator.initAndSeekToKey(
                self.allocator,
                level_ssts,
                key,
            );
            errdefer level_iter.deinit();
            try level_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = level_iter }));
        }
    }
}


// 將多個迭代器合并為一個TwoMergeIterator
var l0_merge_iter = try MergeIterators.init(self.allocator, l0_iters);
errdefer l0_merge_iter.deinit();


var levels_merge_iter = try MergeIterators.init(self.allocator, level_iters);
errdefer levels_merge_iter.deinit();


var iter = try TwoMergeIterator.init(
    try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = l0_merge_iter }),
    try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = levels_merge_iter }),
);
defer iter.deinit();


if (iter.isEmpty()) {
    return false;
}


if (std.mem.eql(u8, iter.key(), key) and iter.value().len > 0) {
    value.* = iter.value();
    return true;
}

壓縮

在上一節的寫過程中,我們實現了從內存表到Level0的SSTable堆疊。

隨著寫入的持續,Lv0的SSTable會越來越多,這個時候就需要我們將Lv0中的數據合并寫入至Lv2,并依次類推重復這個過程,直到堆疊到最深的層數,這個逐層合并數據的過程就是數據壓縮。

圖片圖片

LSM-Tree中數據壓縮的過程大致如下:

圖片圖片

具體的實現代碼可見src/compact.zig,src/storage.zig。

簡單分層壓縮與原始 LSM 論文中的壓縮策略相似。它為 LSM 樹維護多個層級。當一個層級太大時,它會將此層級的所有 SST 與下一層合并。壓縮策略由 3 個參數控制:

  • size_ratio_percent:【文件低級數量/文件高級數量】,當實際計算的值低于此閾值時觸發壓縮。假設這里我們設置為60%,當L0中SST數量為2,L1中SST數量為1,此時ratio為1/2 = 50% < 60%,此時我們應該將L0壓縮合并至L1。
  • level0_file_num_compaction_trigger: 第一層SSTable達到多少后觸發壓縮。因為這是最高層,沒法與更高層比較,只能固定觸發壓縮。
  • max_levels: 顧名思義,最大的層數限制。

做好這些準備工作,我們可以逐步實現壓縮邏輯:

  • 生成壓縮任務:
pub const SimpleLeveledCompactionController = struct {
    options: SimpleLeveledCompactionOptions,


    pub fn generateCompactionTask(self: SimpleLeveledCompactionController, state: *storage.StorageState) !?SimpleLeveledCompactionTask {
        if (self.options.max_levels == 1) {
            return null;
        }


        var level_sizes = std.ArrayList(usize).init(state.allocator);
        defer level_sizes.deinit();


        try level_sizes.append(state.l0_sstables.items.len);
        for (state.levels.items) |level| {
            try level_sizes.append(level.items.len);
        }


        // 如果Lv0中SST數量超出閾值,觸發L0級別壓縮
        if (state.l0_sstables.items.len >= self.options.level0_file_num_compaction_trigger) {
            std.debug.print("compaction of L0 to L1 because L0 has 7tbtlpv SSTS >= ln55nbd\n", .{ state.l0_sstables.items.len, self.options.level0_file_num_compaction_trigger });
            return .{
                .upper_level = null,
                .upper_level_sst_ids = try state.l0_sstables.clone(),
                .lower_level = 1,
                .lower_level_sst_ids = try state.levels.items[0].clone(),
                .is_lower_level_bottom = false,
            };
        }


        // 計算Lv[n+1]/lv[n],如果比例小于閾值,觸發Lv[n]級別壓縮
        for (1..self.options.max_levels) |level| {
            const lower_level = level + 1;
            if (level_sizes.items[level] == 0) {
                continue;
            }
            const size_ration = level_sizes.items[lower_level] * 100 / level_sizes.items[level];
            if (size_ration < self.options.size_ration_percent) {
                std.debug.print("compaction of Lnjd5l35 to Ltfl7drt because Lfnhxzlz size ratio l5h5rvv < drhn5tt\n", .{ level, lower_level, level, size_ration, self.options.size_ration_percent });
                return .{
                    .upper_level = level,
                    .upper_level_sst_ids = try state.levels.items[level - 1].clone(),
                    .lower_level = lower_level,
                    .lower_level_sst_ids = try state.levels.items[lower_level - 1].clone(),
                    .is_lower_level_bottom = lower_level == self.options.max_levels,
                };
            }
        }


        return null;
    }
}
  • 執行壓縮任務:

有了上一小節中讀過程的介紹,多層數據的壓縮過程就很好理解了。

例如我們想將L1與L2的SSTable合并壓縮至L2,我們只需要把L1和L2的數據放在一起創造一個迭代器,再持續從該迭代器中讀出數據寫入新的SSTable中,這個過程保證了新的SSTable中數據不重復且有序。

fn compactSimple(self: *Self, task: SimpleLeveledCompactionTask) !std.ArrayList(SsTablePtr) {
    if (task.upper_level) |_| {
        var upper_ssts = try std.ArrayList(SsTablePtr).initCapacity(
            self.allocator,
            task.upper_level_sst_ids.items.len,
        );
        var lower_ssts = try std.ArrayList(SsTablePtr).initCapacity(
            self.allocator,
            task.lower_level_sst_ids.items.len,
        );


        self.state_lock.lockShared();
        for (task.upper_level_sst_ids.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            try upper_ssts.append(sst.clone());
        }
        for (task.lower_level_sst_ids.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            try lower_ssts.append(sst.clone());
        }
        self.state_lock.unlockShared();


        var upper_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, upper_ssts);
        errdefer upper_iter.deinit();


        var lower_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, lower_ssts);
        errdefer lower_iter.deinit();


        var iter = try TwoMergeIterator.init(
            try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = upper_iter }),
            try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = lower_iter }),
        );
        defer iter.deinit();
        return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
    } else {
        // compact l0_sstables to l1_sstables
        // ..... 代碼邏輯大致與上面LvN層壓縮一致,只是Lv0層的SSTable是無序的需要特殊考慮
        return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
    }
}




fn compactGenerateSstFromIter(self: *Self, iter: *TwoMergeIterator, compact_to_bottom_level: bool) !std.ArrayList(SsTablePtr) {
    var builder: SsTableBuilder = try SsTableBuilder.init(self.allocator, self.options.block_size);
    defer builder.deinit();
    var new_ssts = std.ArrayList(SsTablePtr).init(self.allocator);
    
    // 持續迭代此迭代器
    while (!iter.isEmpty()) {
        // 如果壓縮至最后一層,可以不保留墓碑值key了
        if (compact_to_bottom_level) {
            if (iter.value().len > 0) {
                try builder.add(iter.key(), iter.value());
            }
        } else {
            try builder.add(iter.key(), iter.value());
        }
        // 當寫滿一個SSTable后,就清空builder,把寫滿的SSTable入列
        if (builder.estimatedSize() >= self.options.target_sst_size) {
            // reset builder
            defer builder.reset() catch unreachable;
            const sst_id = self.getNextSstId();
            const path = try self.pathOfSst(sst_id);
            defer self.allocator.free(path);
            var sst = try builder.build(sst_id, self.block_cache.clone(), path);
            errdefer sst.deinit();


            var sst_ptr = try SsTablePtr.create(self.allocator, sst);
            errdefer sst_ptr.deinit();


            try new_ssts.append(sst_ptr);
        }
        try iter.next();
    }
    // 剩余的數據單獨一個SSTable
    if (builder.estimatedSize() > 0) {
        const sst_id = self.getNextSstId();
        const path = try self.pathOfSst(sst_id);
        defer self.allocator.free(path);
        var sst = try builder.build(sst_id, self.block_cache.clone(), path);
        errdefer sst.deinit();
        var sst_ptr = try SsTablePtr.create(self.allocator, sst);
        errdefer sst_ptr.deinit();
        try new_ssts.append(sst_ptr);
    }
    return new_ssts;
}
  • 替換壓縮后的SST

這部分邏輯并不復雜,即刪除此次壓縮任務中的原有兩層數據,用新合并的SSTable替換至較低層數據。

這里有個需要注意的點,即壓縮過程是在一個線程中單獨執行的,壓縮過程中LSM-Tree的原數據可能發生了改變,所以這里執行SSTable刪除時要注意過濾掉新數據,不能覆蓋了有效數據。

并發問題是軟件中的Bug集散地!

pub fn applyCompactionResult(
    _: SimpleLeveledCompactionController,
    state: *storage.StorageState,
    task: SimpleLeveledCompactionTask,
    output: []usize,
) !std.ArrayList(usize) {
    var files_to_remove = std.ArrayList(usize).init(state.allocator);
    errdefer files_to_remove.deinit();


    if (task.upper_level) |upper_level| {
        // 刪除高層SSTable數據,這層數據不會在壓縮過程中變更,放心刪
        std.debug.assert(sliceEquals(
            task.upper_level_sst_ids.items,
            state.levels.items[upper_level - 1].items,
        ));
        try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
        state.levels.items[upper_level - 1].clearAndFree();
    } else {
        // 刪除L0數據,需要小心
        try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
        var new_l0_sstables = std.ArrayList(usize).init(state.allocator);
        errdefer new_l0_sstables.deinit();


        {
            var l0_sst_compacted = std.AutoHashMap(usize, struct {}).init(state.allocator);
            defer l0_sst_compacted.deinit();
            for (task.upper_level_sst_ids.items) |sst_id| {
                try l0_sst_compacted.put(sst_id, .{});
            }


            for (state.l0_sstables.items) |sst_id| {
                if (!l0_sst_compacted.remove(sst_id)) { // 不在壓縮任務中的SST不能刪除
                    try new_l0_sstables.append(sst_id);
                }
            }
            std.debug.assert(l0_sst_compacted.count() == 0);
        }
        state.l0_sstables.deinit();
        state.l0_sstables = new_l0_sstables;
    }
    // 低層SSTable數據,直接刪除
    try files_to_remove.appendSlice(task.lower_level_sst_ids.items);
    state.levels.items[task.lower_level - 1].clearAndFree();
    try state.levels.items[task.lower_level - 1].appendSlice(output);


    return files_to_remove;
}




// sst to remove
var ssts_to_remove = std.ArrayList(SsTablePtr).init(self.allocator);


{
    var new_sst_ids = std.ArrayList(usize).init(self.allocator);
    defer new_sst_ids.deinit();


    self.state_lock.lock();
    defer self.state_lock.unlock();


    for (sstables.items) |sst| {
        const id: usize = @intCast(sst.get().sstId());
        try new_sst_ids.append(id);
        try self.state.sstables.put(id, sst.clone());
    }


    var file_to_remove = try self.compaction_controller.applyCompactionResult(
        &self.state,
        task,
        output.items,
    );
    defer file_to_remove.deinit();


    for (file_to_remove.items) |id| {
        if (self.state.sstables.fetchRemove(id)) |kv| {
            try ssts_to_remove.append(kv.value);
        }
    }
    try self.syncDir();
}


for (ssts_to_remove.items) |sst| {
    const path = try self.pathOfSst(sst.get().sstId());
    defer self.allocator.free(path);
    try std.fs.cwd().deleteFile(path);
}
try self.syncDir();

四、總結

我們使用Zig語言實現了一個LSM-Tree的核心功能,包括MemTable、SSTable、寫流程、各類Iterator與數據壓縮能力。通過這個項目,我收獲了很多心得體會。

了解了LSM-Tree的核心流程

以往對LSM這個數據結構的多層SST設計與寫過程早有耳聞,但是讀流程的實現不太理解。這個項目解答了我疑惑很久的讀流程的實現,特別是MergeIterator的算法設計非常巧妙。

摸索了個zig語言的智能指針

Zig語言沒有內存安全的保證,為了不想指針亂飛到處泄露,在Deepseek的幫助下實現了一個簡單的智能指針,極大降低了內存管理的心智負擔。

工程經驗

  • 盡可能多的做assertion的工作,可以提前暴露很多bug。
  • 大型多模塊的項目,一定要寫單元測試,不然出了bug無法分塊定位問題。
  • 千萬不要把IO過程放在鎖的范圍里,極大的影響性能!
責任編輯:武曉燕 來源: 得物技術
相關推薦

2019-11-26 15:12:08

數據存儲B+樹

2022-11-08 15:14:17

MyBatis插件

2022-10-29 08:44:39

分布式數據庫存儲

2025-09-26 08:06:23

2022-06-02 09:09:27

前端React低代碼編輯器

2020-09-24 11:46:03

Promise

2021-06-30 07:19:36

網絡安全

2023-10-24 16:44:24

RubyDNS

2023-07-25 14:24:33

元素JSX解析器

2023-03-21 07:35:43

2022-09-01 10:46:02

前端組件庫

2025-02-05 12:09:12

2015-11-17 16:11:07

Code Review

2019-01-18 12:39:45

云計算PaaS公有云

2018-04-18 07:01:59

Docker容器虛擬機

2021-03-10 09:52:38

開發技能架構

2024-12-06 17:02:26

2020-07-02 15:32:23

Kubernetes容器架構

2011-04-29 10:46:32

iPhone開發入門iPhoneiOS

2025-06-27 10:41:04

Redis數據庫集群
點贊
收藏

51CTO技術棧公眾號

风间由美久久久| av成人资源| 精品久久网站| 亚洲欧美自拍偷拍色图| 91超碰中文字幕久久精品| 久久久久久久片| 欧美三级在线免费观看| 美女搞黄视频在线观看| 免费人成网站在线观看欧美高清| 精品人在线二区三区| 一区二区欧美日韩| 五月天综合激情网| 一区二区三区视频免费视频观看网站 | 亚洲永久免费| 宅男在线国产精品| 久久99精品久久久久久水蜜桃| 国产男女猛烈无遮挡在线喷水| 亚洲国产欧美日本视频| 成人美女视频在线观看| 日韩中文视频免费在线观看| 逼特逼视频在线| 丁香六月天婷婷| 国产精品国产一区| 91黄视频在线观看| 国产视频一区二区三区四区| 99久久99久久精品国产| 日日夜夜一区| 国产精品视频一二| 欧美成人一二三| 97人人爽人人| 日本www在线| 精品亚洲自拍| 亚洲综合色在线| 成人在线视频网| 一区二区三区免费在线观看视频| 美女网站视频在线| 成人自拍视频在线| 国产精品青青在线观看爽香蕉| 五月婷婷综合在线观看| 欧美日韩国产观看视频| 中文字幕一区二区三区在线不卡 | 91精品欧美福利在线观看| 黄色大片在线免费看| 蜜臀av免费在线观看| 亚洲国产日本| 亚洲精品福利在线| 国产在线精品91| av免费在线观| 国产成人av福利| 国内精久久久久久久久久人| 欧美xxxx×黑人性爽| h片在线观看下载| 国产风韵犹存在线视精品| 欧美精品videossex88| 337p日本欧洲亚洲大胆张筱雨| 色网在线观看| 91在线你懂得| 国产成人久久久精品一区| 国产黄色录像视频| 国产精品视频一区二区三区综合| 亚洲久草在线视频| 国产在线精品二区| 亚洲精品久久久久avwww潮水| 国产精品日韩精品欧美精品| 国产一区二区激情| 性久久久久久久久久久久久久| 久久不射影院| 亚洲卡通欧美制服中文| 成人在线观看www| 天堂中文在线观看视频| 免费在线视频一区| 国产成人午夜视频网址| 狠狠狠狠狠狠狠| 91视频久久| 精品电影一区二区| 高清一区二区视频| 欧美aaaxxxx做受视频| 亚洲精品国产视频| 日本一区二区不卡高清更新| 97免费观看视频| 国产综合欧美| 日韩av影视在线| av在线网址导航| 理论片午夜视频在线观看| 香港成人在线视频| 青少年xxxxx性开放hg| 日本不卡免费播放| 国产成人h网站| 国外成人在线视频网站| 日本中文字幕久久| 欧美日韩国产高清| 久久精品国产一区二区三区| 国产美女喷水视频| 成人高清电影网站| 国产视频在线观看一区二区| 日本成人在线免费观看| 91九色综合| 精品国产91久久久久久老师| 久久久久亚洲av无码专区喷水| 在线播放免费av| 午夜天堂影视香蕉久久| 蜜臀久久99精品久久久酒店新书 | 精品视频在线观看网站| 色欧美88888久久久久久影院| 无需播放器的av| 在线观看欧美日韩电影| 亚洲超碰精品一区二区| 成人免费xxxxx在线视频| 国产精品亚洲四区在线观看| 亚洲国产成人精品电影| 911av视频| 国产精品久av福利在线观看| 欧美一二三在线| 福利片一区二区三区| 国产ts一区| 日韩小视频在线| 国产在视频线精品视频| 超碰成人久久| 国内精品在线一区| 亚洲天堂一二三| 裸体一区二区三区| 国产精品无码专区在线观看| 午夜精品一区二| 国产精品一区二区久久不卡 | 国产成人亚洲综合a∨婷婷 | 男人天堂网在线视频| 欧美激情一区二区三区全黄| 久久久水蜜桃| 国产cdts系列另类在线观看| 中文字幕中文字幕在线一区| 国内精品视频一区二区三区| 免费毛片在线看片免费丝瓜视频| 欧美亚洲动漫精品| 亚洲这里只有精品| 粉嫩av国产一区二区三区| 日韩精品中文字幕视频在线| 黄色片在线观看网站| 99久久婷婷国产综合精品电影√| 91精品国产91久久久久| 国产精品国产三级国产普通话对白| 91视视频在线观看入口直接观看www | 亚洲同性同志一二三专区| 亚洲高清精品中出| 免费观看在线黄色网| 精品久久久久久电影| avtt中文字幕| 亚洲最新色图| 久久久久久高潮国产精品视| 欧美激情一区二区三区免费观看 | 风间由美一区二区三区在线观看| 图片区小说区区亚洲五月| 日本在线视频观看| 亚洲色图清纯唯美| 国产免费又粗又猛又爽| 欧美人与牛zoz0性行为| 色噜噜久久综合伊人一本| 香蕉污视频在线观看| 久久精品欧美一区二区三区不卡 | 国产又粗又猛又黄| 国产成人综合在线| 91社在线播放| 在线观看欧美| 亚洲国产精品久久久| 国产精品50页| 日韩在线一区二区三区| 成人女保姆的销魂服务| 香蕉视频在线播放| 欧美日韩国产123区| 欧美肉大捧一进一出免费视频| 黑丝一区二区三区| 国产一区二区高清不卡| 日本乱码一区二区三区不卡| 日韩精品视频免费| 黄色片视频网站| 国内精品免费在线观看| 免费99视频| a视频在线播放| 欧美一级欧美三级在线观看| 久草网站在线观看| 不卡电影一区二区三区| 在线码字幕一区| 成人短视频app| 亚洲午夜精品久久久久久性色 | 精品久久久久久国产| 国产黄片一区二区三区| 激情欧美日韩| 91久久久在线| 丝袜美腿av在线| 亚洲国产欧美久久| 无码人妻丰满熟妇精品| 中文字幕日韩欧美一区二区三区| 成人三级做爰av| 99这里有精品| 91色p视频在线| 国产深夜视频在线观看| 欧美精品一二三| 欧美一级片黄色| 美女91精品| 极品校花啪啪激情久久| 亚洲www啪成人一区二区| 亚洲福利视频在线| 狠狠狠狠狠狠狠| 亚洲欧美日韩久久精品| 狠狠人妻久久久久久综合蜜桃| 日韩黄色小视频| 日本精品福利视频| 国产精品黄色片| 欧美激情视频网站| www.中文字幕| 一区二区三区在线不卡| 国产女主播喷水高潮网红在线| 国产资源在线一区| 免费观看黄色大片| 欧美三级午夜理伦三级在线观看 | 免费一级做a爰片久久毛片潮| 亚洲国产午夜| 在线播放豆国产99亚洲| 日韩动漫一区| 国内揄拍国内精品| 日本黄色片在线观看| 日韩成人网免费视频| 国产精品一品二区三区的使用体验| 精品福利在线视频| 强行糟蹋人妻hd中文| 欧美国产欧美综合| 成人网站免费观看| 岛国一区二区三区| 亚洲一二三av| 日韩成人免费看| 国产玉足脚交久久欧美| 老汉色老汉首页av亚洲| 91久久极品少妇xxxxⅹ软件 | 澳门精品久久国产| 国产乱人伦真实精品视频| 在线观看福利电影| 97国产精品人人爽人人做| 丝袜视频国产在线播放| 色综合天天综合| 亚洲一二三精品| www国产精品av| 亚洲精品无码一区二区| 午夜一区在线| 国产美女在线一区| 国产精品s色| 欧美少妇在线观看| 精品人人人人| 国产a一区二区| 日本成人精品| 91国语精品自产拍在线观看性色| 亚洲国产精品精华素| 久久精品免费播放| 免费黄色网址在线观看| 久久精品国产96久久久香蕉| 3d成人动漫在线| 欧美va在线播放| 国产黄色av网站| 色香蕉成人二区免费| 日韩人妻无码一区二区三区99| 亚洲国产精品一区二区尤物区| 亚洲av综合一区二区| 91蜜桃免费观看视频| 亚洲欧美色图视频| 91在线视频官网| 中文字幕丰满孑伦无码专区| 97久久久精品综合88久久| 91精品国产自产| 国产午夜精品久久久久久久| 纪美影视在线观看电视版使用方法| 久久精品欧美日韩精品 | 国产精品三级视频| 日本精品在线免费观看| 亚洲精品高清在线| 国产精品18p| 色哦色哦哦色天天综合| 中文字幕在线观看免费| 午夜电影一区二区三区| 国产成人在线播放视频| 日韩欧美aaa| 久草中文在线视频| 国产精品第一页第二页第三页| 国产午夜手机精彩视频| 亚洲综合在线免费观看| 二区视频在线观看| 亚洲综合色视频| 黄色片免费观看视频| 在线观看91视频| 日韩欧美亚洲一区二区三区| 一本久久综合亚洲鲁鲁五月天| 国产成人av免费| 91精品国产麻豆| 国产精品无码粉嫩小泬| 91精品欧美综合在线观看最新| 日本韩国在线观看| 夜夜躁日日躁狠狠久久88av| 天堂在线视频免费| 中文字幕av一区二区三区谷原希美| 婷婷av一区二区三区| 亚洲天堂男人天堂| 国产福利在线播放麻豆| 国产91精品久久久| 亚洲淫性视频| 久久久人成影片一区二区三区| 日韩欧美一区二区三区在线观看| 98精品在线视频| av成人在线播放| 99精彩视频在线观看免费| 91精品亚洲一区在线观看| 国产一区高清视频| 欧美韩日高清| 国产午夜福利100集发布| 美腿丝袜亚洲色图| 国产精品一级黄片| av午夜精品一区二区三区| 久久久久久久穴| 国产蜜臀av在线一区二区三区| 男人的天堂久久久| 欧美探花视频资源| 头脑特工队2在线播放| 久久伊人精品一区二区三区| 92国产在线视频| 97精品国产aⅴ7777| 日韩在线你懂得| 欧美二区在线| 精品不卡一区| 妞干网在线视频观看| 韩国毛片一区二区三区| 久久精品视频18| 性久久久久久久久| www黄色网址| 精品国产一区久久久| 日本精品在线一区| 蜜桃91精品入口| 亚洲国内欧美| 丰满人妻一区二区三区53视频| 日本一区二区不卡视频| 中文字幕视频网| 亚洲精品电影在线| hd国产人妖ts另类视频| 91嫩草免费看| 影音先锋成人在线电影| 国产探花在线看| 中文字幕av资源一区| 无码人妻熟妇av又粗又大| 日韩大片免费观看视频播放| 欧美xxxx性xxxxx高清| 97超级碰碰| 欧美视频在线观看| 久久久久亚洲av片无码v| 成人免费一区二区三区在线观看| 国产精品无码一区| 夜夜躁日日躁狠狠久久88av| 欧美日韩亚洲国产| 水蜜桃亚洲精品| 免费日本视频一区| 日本免费网站视频| 亚洲va天堂va国产va久| 好吊视频一二三区| 97人人做人人爱| 美女福利一区| 69堂免费视频| 久久99精品国产麻豆婷婷| 欧美丰满熟妇bbb久久久| 一区二区三区成人| 国产小视频一区| 久久久视频精品| 精品人人人人| 50路60路老熟妇啪啪| 中文字幕高清不卡| 亚洲一区二区三区网站| 久久精品人人做人人爽| 国产精品毛片无码| 国产一线二线三线女| 99久久久免费精品国产一区二区| 日韩欧美中文字幕一区二区| 亚洲人成网站777色婷婷| 国产激情在线视频| 国产精品 日韩| 99xxxx成人网| 69精品无码成人久久久久久| 亚洲福利视频三区| 日韩美女一级视频| 国产精品视频成人| 亚洲精品二区三区| 一区二区成人网| 亚洲欧美另类综合偷拍| 亚洲va欧美va| 欧美一区在线直播| 国产二区精品| av免费观看不卡| 亚洲男人都懂的| 日本人妻熟妇久久久久久| 日本精品性网站在线观看| 丁香综合av| 日本成人中文字幕在线| 亚洲视频在线一区二区| 熟妇人妻一区二区三区四区| 国产精品激情自拍| 国产乱码精品一区二区亚洲 | 99成人精品| 91视频免费看片|