精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

圖解 Kafka 源碼實現機制之客戶端緩存架構設計

云計算 Kafka
通過場景驅動的方式,當被發送消息通過網絡請求封裝、NIO多路復用器監聽網絡讀寫事件并進行消息網絡收發后,回頭來看看消息是如何在客戶端緩存的?

大家好,我是 華仔, 又跟大家見面了。

上篇主要帶大家深度剖析了「Kafka 網絡層收發總流程」,今天主要聊聊 「Kafka 客戶端消息緩存架構設計」,深度剖析下消息是如何進行緩存的。

認真讀完這篇文章,我相信你會對 Kafka 客戶端緩存架構的源碼有更加深刻的理解。

這篇文章干貨很多,希望你可以耐心讀完。

一、總體概述

通過場景驅動的方式,當被發送消息通過網絡請求封裝、NIO多路復用器監聽網絡讀寫事件并進行消息網絡收發后,回頭來看看消息是如何在客戶端緩存的?

大家都知道 Kafka 是一款超高吞吐量的消息系統,主要體現在「異步發送」、「批量發送」、「消息壓縮」。

跟本篇相關的是「批量發送」即生產者會將消息緩存起來,等滿足一定條件后,Sender 子線程再把消息批量發送給 Kafka Broker。

這樣好處就是「盡量減少網絡請求次數,提升網絡吞吐量」。

為了方便大家理解,所有的源碼只保留骨干。

二、消息如何在客戶端緩存的

既然是批量發送,那么消息肯定要進行緩存的,那消息被緩存在哪里呢?又是如何管理的?

通過下面簡化流程圖可以看出,待發送消息主要被緩存在 RecordAccumulator 里。

我以一個真實生活場景類比解說一下會更好理解。

既然說 RecordAccumulator 像一個累積消息的倉庫,就拿快遞倉庫類比。

上圖是一個快遞倉庫,堆滿了貨物。可以看到分揀員把不同目的地的包裹放入對應目的地的貨箱,每裝滿一箱就放置在對應的區域。

那么分揀員就是指 RecordAccumulator,而貨箱以及各自所屬的堆放區域,就是 RecordAccumulator 中緩存消息的地方。所有封箱的都會等待 sender 來取貨發送出去。

如果你看懂了上圖,就大概理解了 RecordAccumulator 的架構設計和運行邏輯。

總結下倉庫里有什么:

  • 分揀員
  • 貨物
  • 目的地
  • 貨箱
  • 堆放區域

記住這些概念,都會體現在源碼里,流程如下圖所示:

從上面圖中可以看出:

  • 至少有一個業務主線程和一個 sender 線程同時操作 RecordAccumulator,所以它必須是線程安全的。
  • 在它里面有一個 ConcurrentMap 集合「Kafka 自定義的 CopyOnWriteMap」。key:TopicPartiton, value:Deque<ProducerBatch>,即以主題分區為單元,把消息以 ProducerBatch 為單位累積緩存,多個 ProducerBatch 保存在 Deque 隊列中。當 Deque 中最新的 batch 不能容納消息時,就會創建新的 batch 來繼續緩存,并將其加入 Deque。
  • 通過 ProducerBatch 進行緩存數據,為了減少頻繁申請銷毀內存造成 Full GC 問題,Kafka 設計了經典的「緩存池 BufferPool 機制」。

綜上可以得出 RecordAccumulator 類中有三個重要的組件:「消息批次 ProducerBatch」、「自定義 CopyOnWriteMap」、「緩存池 BufferPool 機制」。

由于篇幅原因,RecordAccumulator 類放到下篇來講解。

先來看看 ProducerBatch,它是消息緩存及發送消息的最小單位。

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java。

圖片

通過調用關系可以看出,ProducerBatch 依賴 MemoryRecordsBuilder,而 MemoryRecordsBuilder 依賴 MemoryRecords 構建,所以 「MemoryRecords 才是真正用來保存消息的地方」。

1、MemoryRecords

import java.nio.ByteBuffer;
public class MemoryRecords extends AbstractRecords {
  public static MemoryRecordsBuilder builder(..){
        // 重載builder 
        return builder(...);
  }
    
  public static MemoryRecordsBuilder builder(
    ByteBuffer buffer,
    // 消息版本
    byte magic,
    // 消息壓縮類型
    CompressionType compressionType,
    // 時間戳
    TimestampType timestampType,
    // 基本位移
    long baseOffset,
    // 日志追加時間
    long logAppendTime,
    // 生產者id
    long producerId,
    // 生產者版本
    short producerEpoch,
    // 批次序列號
    int baseSequence,
    boolean isTransactional,
    // 是否是控制類的批次
    boolean isControlBatch,
    // 分區leader的版本
    int partitionLeaderEpoch) {
        // 初始化MemoryRecordsBuilder類
        return new MemoryRecordsBuilder(...);
  }
}

該類比較簡單,通過 builder 方法可以看出依賴 ByteBuffer 來存儲消息。MemoryRecordsBuilder 類的構建是通過 MemoryRecords.builder() 來初始化的。

來看看 MemoryRecordsBuilder 類的實現。

2、MemoryRecordBuilder

public class MemoryRecordsBuilder implements AutoCloseable {
    // 寫操作關閉的輸出流
    private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
        // 當向某個ByteBuffer關閉輸出流寫數據時拋異常
        public void write(int b) {
            throw new ...;
        }
    });
    // 日志時間
    private final TimestampType timestampType;
    // 消息壓縮類型
    private final CompressionType compressionType;
    // kafka對OutputStream接口的實現類,對ByteBuffer實現了自動擴容功能
    private final ByteBufferOutputStream bufferStream;
    // 消息的版本
    private final byte magic;
    // ByteBuffer的最初始位置
    private final int initialPosition;
    // 基本位移
    private final long baseOffset;
    // 消息追加的時間
    private final long logAppendTime;
    // 是否是控制類的批次
    private final boolean isControlBatch;
    // 分區leader的版本
    private final int partitionLeaderEpoch;
    // 寫入上限
    private final int writeLimit;
    // batch頭大小字節數
    private final int batchHeaderSizeInBytes;
    // 評估壓縮率
    private float estimatedCompressionRatio = 1.0F;
    // 對bufferStream添加壓縮功能
    private DataOutputStream appendStream;
    // 是否是事務批次
    private boolean isTransactional;
    // 生產者id
    private long producerId;
    // 生產者版本
    private short producerEpoch;
    // 批次序列號
    private int baseSequence;
    // 壓縮前要寫入的消息體大小字節數
    private int uncompressedRecordsSizeInBytes = 0; 
    // 壓縮前寫入的記錄數(不包括頭)
    private int numRecords = 0;
    // 實際壓縮率
    private float actualCompressionRatio = 1;
    // 最大時間戳
    private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
    // 最大時間戳偏移量
    private long offsetOfMaxTimestamp = -1;
    // 最后的偏移量
    private Long lastOffset = null;
    // 第一次追加消息的時間戳
    private Long firstTimestamp = null;
    // 真正保存消息的地方
    private MemoryRecords builtRecords;

從該類屬性字段來看比較多,這里只講2個關于字節流的字段。

  • CLOSED_STREAM:當關閉某個 ByteBuffer 也會把它對應的寫操作輸出流設置為 CLOSED_STREAM,目的就是防止再向該 ByteBuffer 寫數據,否則就拋異常。
  • bufferStream:首先 MemoryRecordsBuilder 依賴 ByteBuffer 來完成消息存儲。它會將 ByteBuffer 封裝成 ByteBufferOutputStream 并實現了 Java NIO 的 OutputStream,這樣就可以按照流的方式寫數據了。同時 ByteBufferOutputStream 提供了自動擴容 ByteBuffer 能力

來看看它的初始化構造方法。

public MemoryRecordsBuilder(ByteBuffer buffer,...) {
    // 將MemoryRecordsBuilder關聯的ByteBuffer封裝成ByteBufferOutputStream流
    this(new ByteBufferOutputStream(buffer), ...);
}

// 構造方法
public MemoryRecordsBuilder(
    ByteBufferOutputStream bufferStream,
    ...
    int writeLimit) {
        ....
        // 初始位置
        this.initialPosition = bufferStream.position();
        // 1. 根據不同消息版本計算批次Batch頭的長度
        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
        // 2. 調整對應的position
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        // 3. 在bufferStream流外層套一層壓縮流,再套一層DataOutputStream流
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}

從構造函數可以看出,除了基本字段的賦值之外,會做以下3件事情:

  • 根據消息版本、壓縮類型來計算批次 Batch 頭的大小長度。
  • 通過調整 bufferStream 的 position,使其跳過 Batch 頭部位置,就可以直接寫入消息了。
  • 對 bufferStream 增加壓縮功能。

看到這里,挺有意思的,不知讀者是否意識到這里涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。

三者的關系是通過「裝飾器模式」實現的,即 bufferStream 對 ByteBuffer 裝飾實現擴容功能,而 appendStream 又對 bufferStream 裝飾實現壓縮功能。

來看看它的核心方法。

(1)appendWithOffset()

// 追加新記錄
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
   return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}

// 計算下一個連續偏移量
private long nextSequentialOffset() {
  // lastOffset用來記錄當前寫入Record的offset,每次當有新Record寫入時,都會遞增它。
  return lastOffset == null ? baseOffset : lastOffset + 1;
}

// 根據偏移量追加消息
private Long appendWithOffset(
  long offset,
  boolean isControlRecord, 
  long timestamp, 
  ByteBuffer key,
  ByteBuffer value, 
  Header[] headers) {
    try {
        // 檢查isControl標志是否一致
        if (isControlRecord != isControlBatch)
            throw new ...;
        // 保證offset是遞增的
        if (lastOffset != null && offset <= lastOffset)
            throw new ...;
        // 檢查時間戳      
        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
           throw new ...;
        // 只有V2版本才有header
        if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
           throw new ...;
        // 更新firstTimestamp
        if (firstTimestamp == null)
           firstTimestamp = timestamp;
        // V2版本消息寫入
        if (magic > RecordBatch.MAGIC_VALUE_V1)         {
            appendDefaultRecord(offset, timestamp, key, value, headers);
            return null;
        } else {
            //V0、V1 版本消息寫入(此處不進行剖析)
            return appendLegacyRecord(offset, timestamp, key, value, magic);
        }
    } catch (IOException e) {
        // 拋異常
    }
}

該方法主要用來根據偏移量追加寫消息,會根據消息版本來寫對應消息,但需要明確的是 ProducerBatch 對標 V2 版本。

來看看 V2 版本消息寫入邏輯。

private void appendDefaultRecord(
  long offset, 
  long timestamp, 
  ByteBuffer key, 
  ByteBuffer value,
  Header[] headers) throws IOException {
    // 1. 檢查appendStream狀態是否可以寫
    ensureOpenForRecordAppend();
    // 2. 計算寫入多少偏移量
    int offsetDelta = (int) (offset - baseOffset);
    // 3.計算本次寫與第一次寫之間時間差
    long timestampDelta = timestamp - firstTimestamp;
    // 4.使用DefaultRecord.writeTo()方法會按照V2 版本格式寫入appendStream流中,并返回壓縮前的消息大小
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    // 5. 消息寫入成功后更新RecordBatch的元信息
    recordWritten(offset, timestamp, sizeInBytes);
}

// 判斷appendStream狀態是否為CLOSED_STREAM 
private void ensureOpenForRecordAppend() {
    if (appendStream == CLOSED_STREAM)
        throw new ...;
}

// 消息寫入成功后更新RecordBatch的元信息
private void recordWritten(long offset, long timestamp, int size) {
  ....
  // 壓縮前寫入的記錄數 + 1
  numRecords += 1;
  // 壓縮前要寫入的消息體大小字節數 + size
  uncompressedRecordsSizeInBytes += size;
  // 最后的偏移量 + offset
  lastOffset = offset;
  if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
      // 賦值最大時間戳
      maxTimestamp = timestamp;
      // 賦值最大時間戳偏移量
      offsetOfMaxTimestamp = offset;
  }
}

該方法主要用來寫入 V2 版本消息的,主要做以下5件事情:

  • 檢查是否可寫:判斷 appendStream 狀態是否為 CLOSED_STREAM,如果不是就可寫,否則拋異常。
  • 計算本次要寫入多少偏移量。
  • 計算本次寫入和第一次寫的時間差。
  • 按照 V2 版本格式寫入 appendStream 流中,并返回壓縮前的消息大小。
  • 成功后更新 RecordBatch 的元信息。

(2)hasRoomFor()

public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
    // 檢查兩個狀態
    // (1)appendStream流狀態
    // (2)當前已經寫入的預估字節數是否超過了writeLimit寫入上限
    if (isFull())
        return false;
    // 每個RecordBatch至少可以寫入一個Record,此時如果一個Record都沒有,則可以繼續寫入
    if (numRecords == 0)
        return true;
    final int recordSize;
    if (magic < RecordBatch.MAGIC_VALUE_V2) {
        // 預估V0、V1舊版本的Record大小
        recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
    } else {
        // 預估V2版本寫入的Record大小
        int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
        ...
        recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
    }

    // 已寫入字節數 + 本次寫入Record的預估字節數不能超過writeLimit寫入上限
    return this.writeLimit >= estimatedBytesWritten() + recordSize;
}

public boolean isFull() {
      return appendStream == CLOSED_STREAM || 
      (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}

該方法主要用來估計當前 MemoryRecordsBuilder 是否還有空間來容納要寫入的 Record,會在下面 ProducerBatch.tryAppend() 里面調用。

最后來看看小節開始提到的自動擴容功能。

(3)expandBuffer()

public class ByteBufferOutputStream extends OutputStream {
   // 擴容因子1.1倍
   private static final float REALLOCATION_FACTOR = 1.1f;
   // 初始容量
   private final int initialCapacity;
   // 初始位置
   private final int initialPosition;
   // 計算是否需要擴容
   public void ensureRemaining(int remainingBytesRequired) {
     // 當寫入字節數大于buffer當前剩余字節數就開啟擴容
     if (remainingBytesRequired > buffer.remaining())
     expandBuffer(remainingBytesRequired);
  }
  
  // 擴容
  private void expandBuffer(int remainingRequired) {
    // 1. 評估需要多少空間
    int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
    // 2. 申請新的ByteBuffer
    ByteBuffer temp = ByteBuffer.allocate(expandSize);
    // 3. 獲取寫入上限
    int limit = limit();
    // 4. 寫狀態轉換為讀狀態
    buffer.flip();
    // 5. 將buffer讀到新申請的temp里
    temp.put(buffer);
    // 6. 修改寫模式的limit上限
    buffer.limit(limit);
    // 7. 更新原來的buffer的position,防止被重復消費
    buffer.position(initialPosition);
    // 8. 將引用指向新申請的ByteBuffer
    buffer = temp;
  }
}

該方法主要用來判斷是否需要擴容 ByteBuffer 的,即當寫入字節數大于 buffer 當前剩余字節數就開啟擴容,擴容需要做以下3件事情:

  • 評估需要多少空間: 在「擴容空間」、「真正需要多少字節」之間取最大值,此處通過「擴容因子」來計算主要是因為擴容是需要消耗系統資源的,如果每次都按實際數據大小來進行分配空間,會浪費不必要的系統資源。
  • 申請新的空間:根據擴容多少申請新的 ByteBuffer,然后將原來的 ByteBuffer 數據拷貝進去,對應源碼步驟:「3 - 7」。
  • 最后將引用指向新申請的 ByteBuffer。

接下來看看 ProducerBatch 的實現。

3、ProducerBatch

public final class ProducerBatch {
    // 批次最終狀態
    private enum FinalState { ABORTED, FAILED, SUCCEEDED }
    // 批次創建時間  
    final long createdMs;
    // 批次對應的主題分區
    final TopicPartition topicPartition;
    // 請求結果的future
    final ProduceRequestResult produceFuture;
    // 用來存儲消息的callback和響應數據
    private final List<Thunk> thunks = new ArrayList<>();
    // 封裝MemoryRecords對象,用來存儲消息的ByteBuffer
    private final MemoryRecordsBuilder recordsBuilder;
    // batch的失敗重試次數
    private final AtomicInteger attempts = new AtomicInteger(0);
    // 是否是被分裂的批次
    private final boolean isSplitBatch;
    // ProducerBatch的最終狀態
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
    // Record個數
    int recordCount;
    // 最大Record字節數
    int maxRecordSize;
    // 最后一次失敗重試發送的時間戳
    private long lastAttemptMs;
    // 最后一次向該ProducerBatch追加Record的時間戳
    private long lastAppendTime;
    // Sender子線程拉取批次的時間
    private long drainedMs;
    // 是否正在重試過,如果ProducerBatch中的數據發送失敗,則會重新嘗試發送
    private boolean retry;
}

// 構造函數
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
    ...
    // 請求結果的future
    this.produceFuture = new ProduceRequestResult(topicPartition);
    ...
}

一個 ProducerBatch 會存放一條或多條消息,通常把它稱為「批次消息」。

先來看看幾個重要字段:

  • topicPartition:批次對應的主題分區,當前 ProducerBatch 中緩存的 Record 都會發送給該 TopicPartition。
  • produceFuture:請求結果的 Future,通過 ProduceRequestResult 類實現。
  • thunks:Thunk 對象集合,用來存儲消息的 callback 和每個 Record 關聯的 Feture 響應數據。
  • recordsBuilder:封裝 MemoryRecords 對象,用來存儲消息的 ByteBuffer。
  • attemps:batch 的失敗重試次數,通過 AtomicInteger 提供原子操作來進行 Integer 的使用,適合高并發情況下的使用
  • isSplitBatch:是否是被分裂的批次,因單個消息過大導致一個 ProducerBatch 存不下,被分裂成多個 ProducerBatch 來存儲的情況。
  • drainedMs:Sender 子線程拉取批次的時間。
  • retry:如果 ProducerBatch 中的數據發送失敗,則會重新嘗試發送。

在構造函數中,有個重要的依賴組件就是 「ProduceRequestResult」,而它是「異步獲取消息生產結果的類」,簡單剖析下。

(1)ProduceRequestResult 類

public class ProduceRequestResult {
    // 通過一個count為1的CountDownLatch對象間接地實現了Future的功能。
    private final CountDownLatch latch = new CountDownLatch(1);
    private final TopicPartition topicPartition;
    // 用來記錄broker端關聯ProducerBatch中第一條Record分配的offset值
    // 這樣每個Record的真實offset就可以根據自身在ProducerBatch的位置計算出來了(baseOffset + relativeOffset)
    private volatile Long baseOffset = null;
    
    // 構造函數
    public ProduceRequestResult(TopicPartition topicPartition) {
        this.topicPartition = topicPartition;
    }
    // 當等到響應會會調該函數喚醒阻塞的主線程
    public void done() {
        if (baseOffset == null)
            throw new ...;
        this.latch.countDown();
    }
    // 調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
    public void await() throws InterruptedException {
        latch.await();
    }
    // 和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

該類通過 CountDownLatch(1) 間接地實現了 Future 功能,并讓其他所有線程都在這個鎖上等待,此時只需要調用一次 countDown() 方法就可以讓其他所有等待的線程同時恢復執行。

當 Producer 發送消息時會間接調用「ProduceRequestResult.await」,此時線程就會等待服務端的響應。當服務端響應時調用「ProduceRequestResult.done」,該方法調用了「CountDownLatch.countDown」喚醒了阻塞在「CountDownLatch.await」上的主線程。這些線程后續可以通過 ProduceRequestResult 的 error 字段來判斷本次請求成功還是失敗。

接下來看看 ProducerBatch 類的重要方法。

(2)tryAppend()

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    // 1.檢查MemoryRecordsBuilder是否還有空間寫入
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 2.調用append()方法寫入Record
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        // 3. 更新最大Record字節數
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers));
        ...
        // 4.構建FutureRecordMetadata對象
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM);
        // 5. 將Callback和FutureRecordMetadata記錄到thunks集合中
        thunks.add(new Thunk(callback, future));
        // 6. 更新Record記錄數
        this.recordCount++;
        // 7. 返回FutureRecordMetadata
        return future;
    }
}

該方法主要用來嘗試追加寫消息的,主要做以下6件事情:

  • 通過 MemoryRecordsBuilder 的 hasRoomFor() 檢查當前 ProducerBatch 是否還有足夠的空間來存儲此次寫入的 Record。
  • 調用 MemoryRecordsBuilder.append() 方法將 Record 追加到 ByteBuffer 中。
  • 創建 FutureRecordMetadata 對象,底層繼承了 Future 接口,對應此次 Record 的發送。
  • 將 Future 和消息的 callback 回調封裝成 Thunk 對象,放入 thunks 集合中。
  • 更新 Record 記錄數。
  • 返回 FutureRecordMetadata。

可以看出該方法只是讓 Producer 主線程完成了消息的緩存,并沒有實現真正的網絡發送。

接下來簡單看看 FutureRecordMetadata,它實現了 JDK 中 concurrent 的 Future 接口。除了維護 ProduceRequestResult 對象外還維護了 relativeOffset 等字段,其中 relativeOffset 用來記錄對應 Record 在 ProducerBatch 中的偏移量。

該類有2個值得注意的方法,get() 和 value()。

public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    ...
    // 依賴ProduceRequestResult的CountDown來實現阻塞等待
    boolean occurred = this.result.await(timeout, unit);
    ...
    // 調用value()方法返回RecordMetadata對象
    return valueOrError();
}
    
RecordMetadata valueOrError() throws ExecutionException {
    ...
    return value();
}

該方法主要依賴 ProduceRequestResult 的 CountDown 來實現阻塞等待,最后調用 value() 返回 RecordMetadata 對象。

RecordMetadata value() {
    ...
     // 將 partition、baseOffset、relativeOffset、時間戳(LogAppendTime | CreateTimeStamp)等信息封裝成 RecordMetadata 對象返回
    return new RecordMetadata(
      result.topicPartition(), 
      ...);
}

private long timestamp() {
    return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp;
}

該方法主要通過各種參數封裝成 RecordMetadata 對象返回。

了解了 ProducerBatch 是如何寫入數據的,我們再來看看 done() 方法。當 Producer 收到 Broker 端「正常」|「超時」|「異常」|「關閉生產者」等響應都會調用 ProducerBatch 的 done()方法。

(3)done()

public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
    // 1.根據exception決定本次ProducerBatch發送的最終狀態
    final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
    ....
    // 2.通過CAS操作更新finalState狀態,只有第一次更新的時候,才會觸發completeFutureAndFireCallbacks()方法
    if (this.finalState.compareAndSet(null, tryFinalState)) {
        // 3.執行回調
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
    ....
    return false;
}

該方法主要用來是否可以執行回調操作,即當收到該批次響應后,判斷批次 Batch 最終狀態是否可以執行回調操作。

(4)completeFutureAndFireCallbacks()

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
  // 1.更新ProduceRequestResult中的相關字段
  produceFuture.set(baseOffset, logAppendTime, exception);

  // 2.遍歷thunks集合,觸發每個Record的Callback回調
  for (Thunk thunk : thunks) {
      try {
          if (exception == null) {
           // 3.獲取消息元數據
           RecordMetadata metadata = thunk.future.value();
           if (thunk.callback != null)
             //4.調用回調方法
             thunk.callback.onCompletion(metadata, null);
          } else {
              if (thunk.callback != null)
                  // 4.調用回調方法
                  thunk.callback.onCompletion(null, exception);
          }
      } 
      ....
  }
  // 4.調用底層 CountDownLatch.countDown()方法,阻塞在其上的主線程。
  produceFuture.done();
}

該方法主要用來調用回調方法和完成 future,主要做以下3件事情:

  • 更新 ProduceRequestResult 中的相關字段,包括基本位移、消息追加的時間、異常。
  • 遍歷 thunks 集合,觸發每個 Record 的 Callback 回調。
  • 調用底層 CountDownLatch.countDown()方法,阻塞在其上的主線程。

至此我們已經講解了 ProducerBatch 「如何緩存消息」、「如何處理響應」、「如何處理回調」三個最重要方法。

通過一張圖來描述下緩存消息的存儲結構:

接下來看看 Kafka 生產端最經典的 「緩沖池架構」。

三、客戶端緩存池架構設計

為什么客戶端需要緩存池這個經典架構設計呢?

主要原因就是頻繁的創建和釋放 ProducerBatch 會導致 Full GC 問題,所以 Kafka 針對這個問題實現了一個非常優秀的機制,就是「緩存池 BufferPool 機制」。即每個 Batch 底層都對應一塊內存空間,這個內存空間就是專門用來存放消息,用完歸還就行。

接下來看看緩存池的源碼設計。

1、BufferPool

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java。

public class BufferPool {
  // 整個BufferPool總內存大小 默認32M
  private final long totalMemory;
  // 當前BufferPool管理的單個ByteBuffer大小,16k 
  private final int poolableSize;
  // 因為有多線程并發分配和回收ByteBuffer,用鎖控制并發,保證線程安全。
  private final ReentrantLock lock;
  // 對應一個ArrayDeque<ByteBuffer> 隊列,其中緩存了固定大小的 ByteBuffer 對象
  private final Deque<ByteBuffer> free;
  // 此隊列記錄因申請不到足夠空間而阻塞的線程對應的Condition 對象
  private final Deque<Condition> waiters;
  // 非池化可用的內存即totalMemory減去free列表中的全部ByteBuffer的大小
  private long nonPooledAvailableMemory;
  // 構造函數
  public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    ...
    // 總的內存
    this.totalMemory = memory;
    // 默認的池外內存,就是總的內存
    this.nonPooledAvailableMemory = memory;
  }
}

先來看看上面幾個重要字段:

  • totalMemory:整個 BufferPool 內存大小「buffer.memory」,默認是32M。
  • poolableSize:池化緩存池一塊內存塊的大小「batch.size」,默認是16k。
  • lock:當有多線程并發分配和回收 ByteBuffer 時,為了保證線程的安全,使用鎖來控制并發。
  • free:池化的 free 隊列,其中緩存了指定大小的 ByteBuffer 對象。
  • waiters:阻塞線程對應的 Condition 隊列,當有申請不到足夠內存的線程時,為了等待其他線程釋放內存而阻塞等待,對應的 Condition 對象會進入該隊列。
  • nonPooledAvailableMemory:非池化可用內存。

可以看出它只會針對固定大小「poolableSize 16k」的 ByteBuffer 進行管理,ArrayDeque 的初始化大小是16,此時 BufferPool 的狀態如下圖:

接下來看看 BufferPool 的重要方法。

(1)allocate()

// 分配指定空間的緩存,如果緩沖區中沒有足夠的空閑空間,那么會阻塞線程,直到超時或得到足夠空間
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
  // 1.判斷申請的內存是否大于總內存
  if (size > this.totalMemory)
      throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations.");
  // 初始化buffer
  ByteBuffer buffer = null;
  // 2.加鎖,保證線程安全。
  this.lock.lock();
  // 如果當前BufferPool處于關閉狀態,則直接拋出異常
  if (this.closed) {
      this.lock.unlock();
      throw new KafkaException("Producer closed while allocating memory");
  }
  ....
  try {
      // 3.申請內存大小恰好為16k 且free緩存池不為空
      if (size == poolableSize && !this.free.isEmpty())
      // 從free隊列取出一個ByteBuffer
      return this.free.pollFirst();
      
      // 對于申請內存大小非16k情況
      // 先計算free緩存池總空間大小,判斷是否足夠
      int freeListSize = freeSize() * this.poolableSize;
      // 4.當前BufferPool能夠釋放出申請內存大小的空間
      if (this.nonPooledAvailableMemory + freeListSize >= size) {
          // 5.如果size大于非池化可用內存大小,就循環從free緩存池里釋放出來空閑Bytebuffer補充到nonPooledAvailableMemory中,直到滿足size大小為止。
          freeUp(size);
          // 釋放非池化可用內存大小
          this.nonPooledAvailableMemory -= size;
      } else {
          // 如果當前BufferPool不夠提供申請內存大小,則需要阻塞當前線程
          // 累計已經釋放的內存
          int accumulated = 0;
          // 創建對應的Condition,阻塞自己等待別的線程釋放內存
          Condition moreMemory = this.lock.newCondition();
          try {
              // 計算當前線程最大阻塞時長
              long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
              // 把自己添加到等待隊列中末尾,保持公平性,先來的先獲取內存,防止饑餓
              this.waiters.addLast(moreMemory);
              // 循環等待直到分配成功或超時
              while (accumulated < size) {
                  ....
                  try {
                    // 當前線程阻塞等待,返回結果為false則表示阻塞超時
                   waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                  } finally {
                      ....
                  }
                  ....   
                  // 申請內存大小是16k,且free緩存池有了空閑的ByteBuffer
                  if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                    // 從free隊列取出一個ByteBuffer
                    buffer = this.free.pollFirst();
                    // 計算累加器
                    accumulated = size;
                  } else {
                      // 釋放空間給非池化可用內存,并繼續等待空閑空間,如果分配多了只取夠size的空間
                      freeUp(size - accumulated);
                      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                      // 釋放非池化可用內存大小
                      this.nonPooledAvailableMemory -= got;
                      // 累計分配了多少空間
                      accumulated += got;
                  }
              }
              accumulated = 0;
          } finally {
              // 如果循環有異常,將已釋放的空間歸還給非池化可用內存
              this.nonPooledAvailableMemory += accumulated;
              //把自己從等待隊列中移除并結束
              this.waiters.remove(moreMemory);
          }
      }
  } finally {
     // 當非池化可用內存有內存或free緩存池有空閑ByteBufer且等待隊列里有線程正在等待
      try {
          if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
           // 喚醒隊列里正在等待的線程
           this.waiters.peekFirst().signal();
      } finally {
          // 解鎖
          lock.unlock();
      }
  }
  // 說明空間足夠,并且有足夠空閑的了。可以執行真正的分配空間了。
  if (buffer == null)
      // 沒有正好的buffer,從緩沖區外(JVM Heap)中直接分配內存
      return safeAllocateByteBuffer(size);
  else
      // 直接復用free緩存池的ByteBuffer
      return buffer;
}

private ByteBuffer safeAllocateByteBuffer(int size) {
  boolean error = true;
  try {
      //分配空間
      ByteBuffer buffer = allocateByteBuffer(size);
      error = false;
      //返回buffer
      return buffer;
  } finally {
    if (error) {
        //分配失敗了, 加鎖,操作內存pool
        this.lock.lock();
        try {
            //歸還空間給非池化可用內存
            this.nonPooledAvailableMemory += size;
            if (!this.waiters.isEmpty())
                //有其他在等待的線程的話,喚醒其他線程
                this.waiters.peekFirst().signal();
        } finally {
            // 加鎖不忘解鎖
            this.lock.unlock();
        }
    }
  }
}

protected ByteBuffer allocateByteBuffer(int size) {
    // 從JVM Heap中分配空間
    return ByteBuffer.allocate(size);
}

// 不斷從free隊列中釋放空閑的ByteBuffer來補充非池化可用內存
private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

該方法主要用來嘗試分配 ByteBuffer,這里分4種情況說明下:

情況1:申請16k且free緩存池有可用內存

此時會直接從 free 緩存池中獲取隊首的 ByteBuffer 分配使用,用完后直接將 ByteBuffer 放到 free 緩存池的隊尾中,并調用 clear() 清空數據,以便下次重復使用。

圖片

情況2:申請16k且free緩存池無可用內存

此時 free 緩存池無可用內存,只能從非池化可用內存中獲取16k內存來分配,用完后直接將 ByteBuffer 放到 free 緩存池的隊尾中,并調用 clear() 清空數據,以便下次重復使用。

圖片

情況3:申請非16k且free緩存池無可用內存

此時 free 緩存池無可用內存,且申請的是非16k,只能從非池化可用內存(空間夠分配)中獲取一部分內存來分配,用完后直接將申請到的內存空間釋放到非池化可用內存中,后續會被 GC 掉。

圖片

情況4:申請非16k且free緩存池有可用內存,但非池化可用內存不夠

此時 free 緩存池有可用內存,但申請的是非16k,先嘗試從 free 緩存池中將 ByteBuffer 釋放到非池化可用內存中,直到滿足申請內存大小(size),然后從非池化可用內存獲取對應內存大小來分配,用完后直接將申請到的內存空間釋放到到非池化可用內存中,后續會被 GC 掉。

圖片

(2)deallocate()

public void deallocate(ByteBuffer buffer, int size) {
    // 1.加鎖,保證線程安全。
    lock.lock();
    try {
    // 2.如果待釋放的size大小為16k,則直接放入free隊列中
        if (size == this.poolableSize && size == buffer.capacity()) {
            // 清空buffer
            buffer.clear();
            // 釋放buffer到free隊列里
            this.free.add(buffer);
        } else {
            //如果非16k,則由JVM GC來回收ByteBuffer并增加非池化可用內存
            this.nonPooledAvailableMemory += size;
        }
        // 3.喚醒waiters中的第一個阻塞線程
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

該方法主要用來嘗試釋放 ByteBuffer 空間,主要做以下幾件事情:

  • 先加鎖,保證線程安全。
  • 如果待釋放的 size 大小為16k,則直接放入 free 隊列中。
  • 否則由 JVM GC 來回收 ByteBuffer 并增加 nonPooledAvailableMemory。
  • 當有 ByteBuffer 回收了,喚醒 waiters 中的第一個阻塞線程。

最后來看看 kafka 自定義的支持「讀寫分離場景」CopyOnWriteMap 的實現。

2、CopyOnWriteMap

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java。

通過 RecordAccumulator 類的屬性字段中可以看到,CopyOnWriteMap 中 key 為主題分區,value 為向這個分區發送的 Deque<ProducerBatch> 隊列集合。

我們知道生產消息時,要發送的分區是很少變動的,所以寫操作會很少。大部分情況都是先獲取分區對應的隊列,然后將 ProducerBatch 放入隊尾,所以讀操作是很頻繁的,這就是個典型的「讀多寫少」的場景。

所謂 「CopyOnWrite」 就是當寫的時候會拷貝一份來進行寫操作,寫完了再替換原來的集合。

來看看它的源碼實現。

public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    // volatile Map
    private volatile Map<K, V> map;
    // 構造函數
    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

該類只有一個重要的字段 Map,是通過「volatile」來修飾的,目的就是在多線程的場景下,當 Map 發生變化的時候其他的線程都是可見的。

接下來看幾個重要方法,都比較簡單,但是實現非常經典。

03.2.1 get()

// 獲取集合中隊列
public V get(Object k) {
    return map.get(k);
}

該方法主要用來讀取集合中的隊列,可以看到讀操作并沒有加鎖,多線程并發讀取的場景并不會阻塞,可以實現高并發讀取。如果隊列已經存在了就直接返回即可。

(2)putIfAbsent()

public synchronized V putIfAbsent(K k, V v) {
    if (!containsKey(k))
        return put(k, v);
    else
        return get(k);
}

// 判斷隊列是否存在
public boolean containsKey(Object k) {
    return map.containsKey(k);
}

該方法主要用來獲取或者設置隊列,會被多個線程并發執行,通過「synchronized」來修飾可以保證線程安全的,除非隊列不存在才會去設置。

(3)put()

public synchronized V put(K k, V v) {
    Map<K, V> copy = new HashMap<K, V>(this.map);
    V prev = copy.put(k, v);
    this.map = Collections.unmodifiableMap(copy);
    return prev;
}

該方法主要用來設置隊列的, put 時也是通過「synchronized」來修飾的,可以保證同一時間只有一個線程會來更新這個值。

那為什么說寫操作不會阻塞讀操作呢?

  • 首先重新創建一個 HashMap 集合副本。
  • 通過「volatile」寫的方式賦值給對應集合里。
  • 把新的集合設置成「不可修改的 map」,并賦值給字段 map。

這就實現了讀寫分離。對于 Producer 最最核心,會出現多線程并發訪問的就是緩存池。因此這塊的高并發設計相當重要。

四、總結

這里,我們一起來總結一下這篇文章的重點。

  • 帶你先整體的梳理了 Kafka 客戶端消息批量發送的好處。
  • 通過一個真實生活場景類比來帶你理解 RecordAccumulator 內部構造,并且深度剖析了消息是如何在客戶端緩存的,以及內部各組件實現原理。
  • 帶你深度剖析了 Kafka 客戶端非常重要的 BufferPool 、CopyOnWriteMap 的實現原理。
責任編輯:姜華 來源: 華仔聊技術
相關推薦

2022-09-23 08:02:42

Kafka消息緩存

2023-02-22 08:12:30

KafkaSender 線程

2023-03-15 08:17:27

Kafka網絡通信組件

2023-03-31 13:31:45

2017-03-21 17:04:05

Android客戶端架構設計

2019-09-20 08:54:38

KafkaBroker消息

2023-08-14 08:17:13

Kafka服務端

2022-03-29 15:10:22

架構設計模型

2015-03-30 14:24:06

網易布局

2022-08-22 08:45:57

Kafka網絡層源碼實現

2014-08-11 16:35:35

KafkaJava客戶端

2022-06-20 08:03:17

KafkaJava NIO

2022-04-01 08:31:11

RabbitMQ客戶端Channel

2022-03-29 08:31:18

RabbitMQMQ客戶端

2009-08-21 17:53:25

C#網絡編程客戶端程序

2022-04-07 08:30:57

AMQP協議RabbitMQ客戶端源碼

2021-07-16 06:56:50

Nacos注冊源碼

2021-05-07 15:28:03

Kafka客戶端Sarama

2011-04-22 10:34:09

SimpleFrame

2022-07-11 08:02:15

KafkaSelector
點贊
收藏

51CTO技術棧公眾號

精品国自产拍在线观看| 中文字幕乱妇无码av在线| 看电影就来5566av视频在线播放| 亚久久调教视频| 亚洲午夜未满十八勿入免费观看全集| 色悠悠久久综合网| 羞羞视频在线免费国产| av在线免费不卡| 国产精品白嫩美女在线观看| 日本激情视频一区二区三区| 91成人精品在线| 色香蕉成人二区免费| 亚洲小说欧美另类激情| 天堂网2014av| 蓝色福利精品导航| 午夜精品在线视频| 一起操在线播放| 免费成人av| 精品久久久久久久人人人人传媒| www国产黄色| 国产精品实拍| 国产日韩欧美亚洲| 狠狠色狠狠色综合人人| 中文文字幕一区二区三三| 亚洲欧美亚洲| 最近2019中文免费高清视频观看www99 | 国精产品一品二品国精品69xx| 国产模特精品视频久久久久| 久久不射热爱视频精品| 中文字幕被公侵犯的漂亮人妻| 欧美成人精品午夜一区二区| 在线视频欧美精品| 久久精品国产sm调教网站演员| 在线观看二区| 久久网这里都是精品| 国产chinese精品一区二区| 中文字幕xxxx| 99精品视频免费观看| 欧美美女15p| 免费成人深夜蜜桃视频| 欧美精美视频| 亚洲精品网址在线观看| 波多野结衣一二三区| 欧美成人精品一级| 51精品国自产在线| 国产wwwxx| 免费观看成人性生生活片| 亚洲国产sm捆绑调教视频 | a天堂视频在线观看| 精品999日本久久久影院| 欧美三片在线视频观看| 欧美 日韩 国产一区| 色在线免费观看| 亚洲成人高清在线| 激情六月天婷婷| 成人video亚洲精品| 亚洲天堂av一区| 一区在线电影| 老司机午夜在线| 1区2区3区精品视频| 亚洲一卡二卡| 日本天堂在线观看| 国产精品久久一级| 中文字幕在线观看一区二区三区| 午夜小视频在线| 国产精品色噜噜| 夜夜爽99久久国产综合精品女不卡| 成年女人的天堂在线| 国产欧美日韩综合| 亚洲午夜精品国产| 18在线观看的| 亚洲成在人线在线播放| 欧美网站免费观看| 欧美无毛视频| 欧美日韩中文国产| 日韩欧美中文视频| 成人免费直播在线| 国产视频丨精品|在线观看| 国产黄色网址在线观看| 精品高清久久| www.亚洲男人天堂| 久久久久99精品成人片试看| 欧美日韩亚洲一区| 国内精品久久久久久久久| 国产精品成人69xxx免费视频| 888久久久| 久久久之久亚州精品露出| 亚洲日本韩国在线| 免费观看成人鲁鲁鲁鲁鲁视频| 国产一区在线播放| 高清乱码毛片入口| 久久精品欧美一区二区三区麻豆| 夜夜爽99久久国产综合精品女不卡 | 成人午夜看片网址| 久久久久久99| 精品国产99久久久久久| 亚洲成人自拍偷拍| 艹b视频在线观看| 亚洲性视频在线| 日韩电影大全免费观看2023年上 | www.蜜臀av| 91香蕉视频污在线| 一区二区三区四区国产| 精品日韩av| 欧美怡红院视频| 国产a√精品区二区三区四区| 伊人久久大香线蕉| 欧美激情乱人伦| 成人一级免费视频| 粉嫩高潮美女一区二区三区 | 国产在线日本| 一区二区三区丝袜| 日韩av片网站| 韩国精品福利一区二区三区| 日韩视频在线免费| 日本中文在线播放| 国产福利一区在线| 色综合666| 色是在线视频| 欧美刺激脚交jootjob| 亚洲一二三四视频| 国产视频一区在线观看一区免费| 成人在线视频网| 深夜福利免费在线观看| 亚洲永久免费av| 99re精彩视频| 国内精品久久久久久久久电影网| 久久99国产精品自在自在app| 亚洲精品中文字幕乱码三区91| 国产一区在线精品| 亚洲精品日韩在线观看| 巨茎人妖videos另类| 精品久久久久久久人人人人传媒 | 成人在线免费观看91| 91av在线免费观看| 免费观看毛片网站| 亚洲狠狠丁香婷婷综合久久久| 污污动漫在线观看| 精品一区二区三区中文字幕老牛| 啪一啪鲁一鲁2019在线视频| 欧美熟妇另类久久久久久不卡| 亚洲精品乱码久久久久久日本蜜臀| 成人亚洲精品777777大片| 在线一级成人| 热门国产精品亚洲第一区在线| 欧美一级免费片| 亚洲国产综合色| 久久久久久无码精品人妻一区二区| 欧美xxxx中国| 成人国产精品一区| 免费a级毛片在线播放| 欧美日韩在线三区| 久久一级免费视频| 国内精品国产成人| 欧洲美女和动交zoz0z| 成人污污视频| 久久av.com| 亚洲AV午夜精品| 亚洲综合激情小说| 污污免费在线观看| 日韩一级欧洲| 蜜桃成人在线| 国产电影一区二区三区爱妃记| 中文字幕国产精品| 在线观看亚洲一区二区| 综合久久给合久久狠狠狠97色| 亚洲精品在线视频播放| 午夜精品国产| 国产综合 伊人色| 成人黄色免费短视频| 伊人久久综合97精品| 亚洲午夜激情视频| 一区二区三区在线视频播放| 深夜视频在线观看| 性色av一区二区怡红| 色女人综合av| 精品视频一二| 国产91精品久久久久久久| 户外极限露出调教在线视频| 欧美日韩高清在线| 免费人成在线观看| 久久久久国产免费免费| 午夜剧场高清版免费观看| 国产精品jizz在线观看美国| 精品午夜一区二区三区| 黄色日韩网站| 久久久久久久久久国产精品| 美丽的姑娘在线观看免费动漫| 欧美日韩视频在线观看一区二区三区| 亚洲天堂黄色片| 91香蕉视频污| 一级做a爱视频| 亚洲欧美久久久| 日韩视频在线免费播放| 免费看久久久| 成人高h视频在线| yellow在线观看网址| 日韩在线一区二区三区免费视频| www.五月婷婷| 欧美中文字幕一区| 久久久久久久久久久网| 中文字幕成人在线观看| 国产精品无码专区| 久久电影网电视剧免费观看| 国产老熟妇精品观看| 日韩欧美视频在线播放| 久久狠狠久久综合桃花| 欧美经典影片视频网站| 国产91色在线免费| 免费男女羞羞的视频网站在线观看| 亚洲小视频在线| 亚洲免费一级片| 欧美人与禽zozo性伦| 日本免费在线观看视频| 一区二区三区在线视频播放| 欧美另类69xxxx| 久久综合色综合88| 中文字幕久久久久久久| 免费在线看成人av| 久久久999视频| 欧美人与禽猛交乱配视频| 四虎影院一区二区三区| 亚洲精品亚洲人成在线| 成人18视频| 久久国产精品美女| 国产精品一区二区三| 一区一区三区| 午夜精品视频在线| 蜜桃成人365av| 久久伊人91精品综合网站| 久久视频www| 日韩精品免费在线观看| 亚洲精品一区二区口爆| 91精品国产日韩91久久久久久| 国产偷人爽久久久久久老妇app| 午夜电影久久久| 欧美亚洲天堂网| 亚洲一区免费观看| 欧美精品99久久久| 亚洲欧洲中文日韩久久av乱码| 婷婷丁香综合网| 欧美激情在线一区二区| 亚洲の无码国产の无码步美| 成人国产亚洲欧美成人综合网 | 国产精品久久久久久久久久久久久久久| 欧美中文在线免费| 韩国美女久久| 日本高清久久天堂| 亚洲成人看片| 国产国语刺激对白av不卡| 欧美在线va视频| 国产精品电影久久久久电影网| 日本韩国欧美| 国产精品第10页| 国产成+人+综合+亚洲欧美| 国产精品日日摸夜夜添夜夜av| 高清成人在线| 国产精品视频免费观看www| 电影亚洲一区| 91久久精品国产91久久性色| 精品国产三区在线| 高清国语自产拍免费一区二区三区| 亚洲一区二区三区在线免费 | 日韩高清影视在线观看| 久久精品一区二区三区不卡免费视频| 欧美一级三级| 日韩高清国产精品| 爽成人777777婷婷| 天天想你在线观看完整版电影免费| 欧美a级在线| 免费超爽大片黄| 久久一区国产| 日本美女视频一区| 丁香激情综合五月| 免费成人深夜夜行p站| 日本一区二区三区四区 | 亚洲图片有声小说| 青青草免费观看视频| 欧美日韩精品一区二区三区| 亚洲产国偷v产偷v自拍涩爱| 日韩av在线免费| shkd中文字幕久久在线观看| 久久精品电影一区二区| av资源一区| 国产精品99导航| 日韩黄色av| 欧美日韩一区二区视频在线观看| 999国产精品| 青青草视频在线免费播放| 日本亚洲视频在线| 精品国产aⅴ一区二区三区东京热 久久久久99人妻一区二区三区 | 97免费视频在线| 成人四虎影院| av日韩免费电影| 国产一区日韩| 91大学生片黄在线观看| 久久精品九九| 原创真实夫妻啪啪av| 久久久久久久精| 久草资源在线视频| 欧美探花视频资源| 欧美一区二区三区成人片在线| 宅男66日本亚洲欧美视频| 男插女视频久久久| 91精品在线一区| 亚洲美女15p| 国产资源第一页| 奇米888四色在线精品| 国产女人18毛片水真多18| 中文一区二区完整视频在线观看| 国产一级视频在线播放| 欧美日韩国产小视频| 香蕉久久国产av一区二区| 久久九九全国免费精品观看| 三上悠亚亚洲一区| 国产精品久久久久久久免费大片| 日本久久综合| 免费日韩视频在线观看| 成人综合婷婷国产精品久久免费| 999福利视频| 色综合中文字幕| 男人天堂一区二区| 久久中国妇女中文字幕| 国产第一精品| 日韩av电影免费在线| 国产情侣一区| 亚洲精品激情视频| 一区二区三区资源| 国产精品自产拍| 精品国产拍在线观看| 欧美日韩国产网站| 日本一区网站| 丝袜亚洲另类欧美综合| 91精彩刺激对白露脸偷拍| 午夜精品福利久久久| 高潮毛片7777777毛片| 欧美二区在线播放| 欧一区二区三区| 久久av喷吹av高潮av| 久久99久久久欧美国产| 亚洲午夜精品久久久久久高潮| 日韩欧美在线视频日韩欧美在线视频| 天堂av手机版| 欧美一级高清免费播放| 青青草久久爱| 日本毛片在线免费观看| 2023国产精品| 欧美特级黄色片| 亚洲最大在线视频| 国产精品伦一区二区| 亚洲视频小说| 精品一区二区日韩| 欧洲猛交xxxx乱大交3| 日韩欧美在线一区二区三区| wwwav在线| 成人久久18免费网站漫画| 欧美日韩一区二区国产| 91亚洲一线产区二线产区| 亚洲大片免费看| 日本福利在线观看| 国产成人精品在线| 久久美女精品| 午夜激情视频网| 亚洲r级在线视频| 日本一区视频| 国产精品久久久久久久久久新婚 | 亚洲老头老太hd| 欧美大胆性生话| 亚洲欧美影院| 国产成人精品在线看| 欧美亚韩一区二区三区| 亚洲一二三在线| 电影91久久久| 久久久性生活视频| 久久久久久一二三区| 亚洲专区在线播放| 欧美精品激情在线观看| 亚洲激情播播| 日韩成人av免费| 亚洲成a人v欧美综合天堂下载| 亚洲区小说区图片区| 国产精品自拍偷拍| 韩国av一区| 亚洲精品视频网址| 欧美va亚洲va国产综合| 在线观看爽视频| 最近免费观看高清韩国日本大全| eeuss影院一区二区三区| 成人黄色三级视频| 久久99精品久久久久久青青91| 美女一区2区| 亚洲久久中文字幕| 香蕉乱码成人久久天堂爱免费| 国产一区精品| 国产精品免费一区二区三区在线观看| 久久精品动漫| 久久这里只有精品国产| 国产性色av一区二区| 成人av综合网| 污污的视频免费|