精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

圖解 Kafka 源碼之 Sender 線程架構設計

開發(fā) 架構
今天主要聊聊「發(fā)送網(wǎng)絡 I/O 的 Sender 線程的架構設計」,深度剖析下消息是如何被發(fā)送出去的。

大家好,我是 華仔, 又跟大家見面了。

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。

上篇主要帶大家深度剖析了「號稱承載 Kafka 客戶端消息快遞倉庫 RecordAccmulator 的架構設計」,消息被暫存到累加器中,今天主要聊聊「發(fā)送網(wǎng)絡 I/O 的 Sender 線程的架構設計」,深度剖析下消息是如何被發(fā)送出去的。

這篇文章干貨很多,希望你可以耐心讀完。

一、總的概述

通過「場景驅動」的方式,來看看消息是如何從客戶端發(fā)送出去的。

在上篇中,我們知道了消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端。

接下來我們就來看看,「Sender 線程的架構實現(xiàn)以及發(fā)送處理流程」,為了方便大家理解,所有的源碼只保留骨干。

二、Sender 線程架構設計

在《圖解Kafka生產(chǎn)者初始化核心流程》這篇中我們知道 KafkaProducer 會啟動一個后臺守護進程,其線程名稱:kafka-producer-network-thread + "|" + clientId。

在 KafkaProducer.java 類有常量定義:NETWORK_THREAD_PREFIX,并啟動 守護線程 KafkaThread 即 ioThread,如果不主動關閉 Sender 線程會一直執(zhí)行下去。

github 源碼地址如下:

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java?

public class KafkaProducer<K, V> implements Producer<K, V> {
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// visible for testing
@SuppressWarnings("unchecked")
KafkaProducer(Map<String, Object> configs,Serializer<K> keySerializer,
Serializer<V> valueSerializer,ProducerMetadata metadata,
KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {
try {
...
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
log.debug("Kafka producer started");
} catch (Throwable t) {
...
}
}
}

從上面得出 Sender 類是一個線程類, 我們來看看 Sender 線程的重要字段和方法,并講解其是如何發(fā)送消息和處理消息響應的。

1、關鍵字段

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {
/* the state of each nodes connection */
private final KafkaClient client; // 為 Sender 線程提供管理網(wǎng)絡連接進行網(wǎng)絡讀寫
/* the record accumulator that batches records */
private final RecordAccumulator accumulator; // 消息倉庫累加器
/* the metadata for the client */
private final ProducerMetadata metadata; // 生產(chǎn)者元數(shù)據(jù)
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
private final boolean guaranteeMessageOrder; // 是否保證消息在 broker 端的順序性
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize; //發(fā)送消息最大字節(jié)數(shù)。
/* the number of acknowledgements to request from the server */
private final short acks; // 生產(chǎn)者的消息發(fā)送確認機制
/* the number of times to retry a failed request before giving up */
private final int retries; // 發(fā)送失敗后的重試次數(shù),默認為0次

/* true while the sender thread is still running */
private volatile boolean running; // Sender 線程是否還在運行中
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose; // 是否強制關閉,此時會忽略正在發(fā)送中的消息。
/* the max time to wait for the server to respond to the request*/
private final int requestTimeoutMs; // 等待服務端響應的最大時間,默認30s
/* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs; // 失敗重試退避時間
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions; // 所有 node 支持的 api 版本
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager; // 事務管理,這里忽略 后續(xù)會有專門一篇講解事務相關的
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches; // 正在執(zhí)行發(fā)送相關的消息批次集合, key為分區(qū),value是 list<ProducerBatch> 。

從該類屬性字段來看比較多,這里說幾個關鍵字段:

  1. client:KafkaClient 類型,是一個接口類,Sender 線程主要用它來實現(xiàn)真正的網(wǎng)絡I/O,即 NetworkClient。該字段主要為 Sender 線程提供了網(wǎng)絡連接管理和網(wǎng)絡讀寫操作能力。
  2. accumulator:RecordAccumulator類型,上一篇的內容  圖解 Kafka 源碼之快遞倉庫 RecordAccumulator 架構設計,Sender 線程用它獲取待發(fā)送的 node 節(jié)點及批次消息等能力。
  3. metadata:ProducerMetadata類型,生產(chǎn)者元數(shù)據(jù)。因為發(fā)送消息時要知道分區(qū) Leader 在哪些節(jié)點,以及節(jié)點的地址、主題分區(qū)的情況等。
  4. guaranteeMessageOrder:是否保證消息在 broker 端的順序性,參數(shù):max.in.flight.requests.per.connection。
  5. maxRequestSize:單個請求發(fā)送消息最大字節(jié)數(shù),默認為1M,它限制了生產(chǎn)者在單個請求發(fā)送的記錄數(shù),以避免發(fā)送大量請求。
  6. acks:生產(chǎn)者的消息發(fā)送確認機制。有3個可選值:0,1,-1/all。
  7. retries:生產(chǎn)者發(fā)送失敗后的重試次數(shù)。默認是0次。
  8. running:Sender線程是否還在運行中。
  9. forceClose:是否強制關閉,此時會忽略正在發(fā)送中的消息。
  10. requestTimeoutMs:生產(chǎn)者發(fā)送請求后等待服務端響應的最大時間。如果超時了且配置了重試次數(shù),會再次發(fā)送請求,待重試次數(shù)用完后在這個時間范圍內返回響應則認為請求最終失敗,默認 30 秒。
  11. retryBackoffMs:生產(chǎn)者在發(fā)送請求失敗后可能會重新發(fā)送失敗的請求,其目的就是防止重發(fā)過快造成服務端壓力過大。默認100 ms。
  12. apiVersions:ApicVersions類對象,保存了每個node所支持的api版本。
  13. inFlightBatches:正在執(zhí)行發(fā)送相關的消息批次集合, key為分區(qū),value是 list<|ProducerBatch>。

2、關鍵方法

Sender 類的方法也不少,這里針對關鍵方法逐一講解下。

(1)run()

Sender 線程實現(xiàn)了 Runnable 接口,會不斷的調用 runOnce(),這是一個典型的循環(huán)事件機制。

/**
* The main run loop for the sender thread
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// 這里使用 volatile boolean 類型的變量 running,判斷 Sender 線程是否在運行狀態(tài)中。
// 1. 如果 Sender 線程在運行狀態(tài)即 running=true,則一直循環(huán)調用 runOnce() 方法。
while (running) {
try {
// 將緩沖區(qū)的消息發(fā)送到 broker。
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 2. 如果(沒有強制關閉 && ((消息累加器中還有剩余消息待發(fā)送 || 還有等待未響應的消息 ) || 還有事務請求未完成)),則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
// 繼續(xù)執(zhí)行將剩余的消息發(fā)送完畢
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 3. 對進行中的事務進行中斷,則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 4. 如果強制關閉,則關閉事務管理器、終止消息的追加并清空未完成的批次
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
// 關閉事務管理器
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
// 終止消息的追加并清空未完成的批次
this.accumulator.abortIncompleteBatches();
}
// 5. 關閉 NetworkClient
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

當 Sender 線程啟動后會直接運行 run() 方法,該方法在 4種情況下會一直循環(huán)調用去發(fā)送消息到 Broker。

(2)runOnce()

我們來看看這個執(zhí)行業(yè)務處理的方法,關于事務的部分后續(xù)專門文章講解。

/**
* Run a single iteration of sending
*/
void runOnce() {
if (transactionManager != null) {
... //事務處理方法 后續(xù)文章專門講解
}
// 1. 獲取當前時間的時間戳。
long currentTimeMs = time.milliseconds();
// 2. 調用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 把消息緩存在 KafkaChannel 的 Send 字段里。下一篇會講解 NetworkClient。
long pollTimeout = sendProducerData(currentTimeMs);
// 3. 讀取消息實現(xiàn)真正的網(wǎng)絡發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

該方法比較簡單,主要做了3件事情:

  1. 獲取當前時間的時間戳。
  2. 調用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 NetworkClient 的 Send 字段里。下一篇會講解 NetworkClient。
  3. 讀取 NetworkClient 的 send 字段消息實現(xiàn)真正的網(wǎng)絡發(fā)送。

(3)sendProducerData()

該方法主要是獲取已經(jīng)準備好的節(jié)點上的批次數(shù)據(jù)并過濾過期的批次集合,最后暫存消息。

private long sendProducerData(long now) {
// 1. 獲取元數(shù)據(jù)
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 2. 獲取已經(jīng)準備好的節(jié)點以及找不到 Leader 分區(qū)對應的節(jié)點的主題
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 3. 如果主題 Leader 分區(qū)對應的節(jié)點不存在,則強制更新元數(shù)據(jù)
if (!result.unknownLeaderTopics.isEmpty()) {
// 添加 topic 到?jīng)]有拉取到元數(shù)據(jù)的 topic 集合中,并標識需要更新元數(shù)據(jù)
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
...
// 針對這個 topic 集合進行元數(shù)據(jù)更新
this.metadata.requestUpdate();
}
// 4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了
if (!this.client.ready(node, now)) {
// 移除對應節(jié)點
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 5. 獲取上面返回的已經(jīng)準備好的節(jié)點上要發(fā)送的 ProducerBatch 集合
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 6. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關的消息批次集合中
addToInflightBatches(batches);
// 7. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
// 對 tp 進行 mute,保證只有一個 batch 正在發(fā)送
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 重置下一批次的過期時間
accumulator.resetNextBatchExpiryTime();
// 8. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過期集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 9. 從 batches 中獲取過期集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 10. 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過 期是根據(jù)系統(tǒng)當前時間與 ProducerBatch 創(chuàng)建時間之差是否超過120s,過期時間可以通過參數(shù) delivery.timeout.ms 設置。
expiredBatches.addAll(expiredInflightBatches);

// 如果過期批次不為空 則輸出對應日志
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
// 11. 處理已過期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端
for (ProducerBatch expiredBatch : expiredBatches) {
// 處理當前過期ProducerBatch的回調結果 ProduceRequestResult,并且設置超時異常 new TimeoutException(errorMessage)
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
// 通知該批消息發(fā)送失敗并返回給客戶端
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
// ... 事務管理器的處理忽略
}
// 收集統(tǒng)計指標,后續(xù)會專門對 Kafka 的 Metrics 進行分析
sensors.updateProduceRequestMetrics(batches);

// 設置下一次的發(fā)送延時
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
// 12. 發(fā)送消息暫存到 NetworkClient send 字段里。
sendProduceRequests(batches, now);
return pollTimeout;
}

該方法主要做了12件事情,逐一說明下:

  1. 首先獲取元數(shù)據(jù),這里主要是根據(jù)元數(shù)據(jù)的更新機制來保證數(shù)據(jù)的準確性。
  2. 獲取已經(jīng)準備好的節(jié)點。accumulator#reay() 方法會通過發(fā)送記錄對應的節(jié)點和元數(shù)據(jù)進行比較,返回結果中包括兩個重要的集合:「準備好發(fā)送的節(jié)點集合 readyNodes」、「找不到 Leader 分區(qū)對應節(jié)點的主題 unKnownLeaderTopic」。
  3. 如果主題 Leader 分區(qū)對應的節(jié)點不存在,則強制更新元數(shù)據(jù)。
  4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了。在 NetworkClient 中維護了客戶端與所有節(jié)點的連接,這樣就可以通過連接的狀態(tài)判斷是否連接正常。
  5. 獲取上面返回的已經(jīng)準備好的節(jié)點上要發(fā)送的 ProducerBatch 集合。accumulator#drain() 方法就是將 「TopicPartition」-> 「ProducerBatch 集合」的映射關系轉換成 「Node 節(jié)點」->「ProducerBatch 集合」的映射關系,如下圖所示,這樣的話按照節(jié)點方式只需要2次就完成,大大減少網(wǎng)絡的開銷。

圖片

  1. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關的消息批次集合中。
  2. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1,會添加到 muted 集合,保證只有一個 batch 在發(fā)送。
  3. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過期集合。
  4. 從 accumulator 累加器的 batches 中獲取過期集合。
  5. 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過期是根據(jù)系統(tǒng)當前時間與 ProducerBatch 創(chuàng)建時間之差是否超過120s,過期時間可以通過參數(shù) delivery.timeout.ms 設置。
  6. 處理已過期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端。
  7. 發(fā)送消息暫存到 NetworkClient send 字段里。

從上面源碼可以看出,SendProducerData 方法中調用到了 Sender 線程類中多個方法,這里就不一一講解了。

三、Sender 發(fā)送流程分析

通過前兩部分的源碼解讀和剖析,我們可以得出 Sender 線程的處理流程可以分為兩大部分:「發(fā)送請求」、「接收響應結果」。

1、發(fā)送請求

從 runOnce 方法可以得出發(fā)送請求也分兩步:「消息預發(fā)送」、「真正的網(wǎng)絡發(fā)送」。

void runOnce() {
// 1. 把消息緩存在 KafkaChannel 的 Send 字段里。
long pollTimeout = sendProducerData(currentTimeMs);
// 2. 讀取消息實現(xiàn)真正的網(wǎng)絡發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

2、接收響應結果

等 Sender 線程收到 Broker 端的響應結果后,會根據(jù)響應結果分情況進行處理。

3、時序圖

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入

四、總結

這里,我們一起來總結一下這篇文章的重點。

1、開篇總述消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端,從而引出「Sender 線程」。

2、帶你深度剖析了「Sender 線程」 的發(fā)送消息以及響應處理的相關方法。

3、最后帶你串聯(lián)了整個消息發(fā)送的流程,讓你有個更好的整體認知。

責任編輯:姜華 來源: 華仔聊技術
相關推薦

2023-03-15 08:17:27

Kafka網(wǎng)絡通信組件

2023-12-26 08:16:56

Kafka緩存架構客戶端

2022-03-29 15:10:22

架構設計模型

2022-09-23 08:02:42

Kafka消息緩存

2019-09-20 08:54:38

KafkaBroker消息

2023-08-14 08:17:13

Kafka服務端

2021-11-01 17:17:13

Kafka 高并發(fā)場景

2024-03-14 08:33:13

kafka三高架構Zookeeper

2015-06-02 04:17:44

架構設計審架構設計說明書

2024-08-23 16:04:45

2021-06-10 07:49:27

Kafka 架構設計

2015-06-02 04:34:05

架構設計

2012-05-30 09:43:45

業(yè)務邏輯層

2009-06-22 14:48:21

DRY架構設計

2014-05-19 10:08:36

IM系統(tǒng)架構設計

2023-04-13 08:23:28

軟件架構設計

2024-09-18 09:04:33

架構模式查詢

2024-10-30 10:06:51

2021-12-07 07:32:09

kafka架構原理

2023-08-16 12:34:16

同步備份異步備份
點贊
收藏

51CTO技術棧公眾號

欧美日韩一二三| 国产欧美一区二区三区在线看蜜臀| 美女久久久久久久| 精品国产免费久久久久久婷婷| 麻豆传媒在线完整视频| 成人h动漫精品一区二区| 日韩av电影中文字幕| 波多野结衣在线网址| 欧美激情网址| 欧美区视频在线观看| 日韩av高清在线看片| 在线日本视频| 97久久久精品综合88久久| 国产精品亚洲欧美导航| 日本一区二区三区四区五区| 大片网站久久| 亚洲精品第一国产综合精品| 国产欧美激情视频| 超碰aⅴ人人做人人爽欧美| 亚洲乱码国产乱码精品精可以看 | 在线成人免费| 精品久久久久久电影| 超碰成人在线免费观看| 国产天堂在线| 久久这里只有精品6| ts人妖另类在线| 在线免费av片| 久久这里有精品15一区二区三区| 久久69精品久久久久久久电影好| 日韩一级片在线免费观看| 欧美深夜视频| 亚洲电影第1页| 午夜久久福利视频| 欧亚一区二区| 色素色在线综合| 日日碰狠狠添天天爽超碰97| 肉体视频在线| 亚洲免费三区一区二区| 正在播放91九色| www.在线播放| 欧美韩日一区二区三区| 青青草原成人| 日韩二区三区| 久久久99久久精品欧美| 欧美久久电影| 日本福利片在线| 91在线免费视频观看| 国产精品日韩一区二区| 亚洲伦理在线观看| 粉嫩13p一区二区三区| yy111111少妇影院日韩夜片| 国产草草影院ccyycom| 国内不卡的二区三区中文字幕| 国产精品亚洲自拍| 91美女精品网站| 久久9热精品视频| 91精品久久久久久久久久久| 艳妇乳肉豪妇荡乳av| 精品中文字幕一区二区| 91久久精品视频| 国产欧美日韩综合精品一区二区三区 | 91小视频在线观看| 精品中文字幕人| 男人av在线| 日本一区二区三区久久久久久久久不| 日本在线成人一区二区| 国产69久久| 国产精品国产三级国产有无不卡 | 精品人人视频| 欧美性生交xxxxxdddd| 欧美成人免费高清视频| 粉嫩一区二区三区| 欧美欧美欧美欧美| av影片在线播放| 国产精品黄网站| 亚洲女同精品视频| 亚洲色图27p| 亚洲男女av一区二区| 欧美大片免费观看在线观看网站推荐| 久久精品国产av一区二区三区| 亚洲国产免费看| 日本欧美精品在线| 国产精品一区二区人人爽| 国产99精品视频| 欧美日韩日本网| 欧美尤物美女在线| 亚洲福中文字幕伊人影院| 337p粉嫩大胆噜噜噜鲁| 国模私拍国内精品国内av| 91精品啪在线观看国产60岁| 午夜视频在线观看国产| 欧州一区二区| 久热在线中文字幕色999舞| 国产做受高潮漫动| 久99久精品视频免费观看| 成人精品一二区| 久草福利在线视频| 一区二区在线观看免费| 茄子视频成人免费观看| 精品国产一区二区三区性色av| 亚洲第一男人av| a级黄色免费视频| 99re国产精品| 成人免费视频97| 你懂的好爽在线观看| 亚洲精品高清在线观看| 欧美激情精品久久久久久小说| 中文成人在线| 国产亚洲人成网站在线观看| 久久久全国免费视频| 蜜臀av性久久久久av蜜臀妖精| 成人做爰66片免费看网站| 九色在线播放| 午夜精品免费在线观看| 在线免费看污网站| 国内黄色精品| 97av在线视频| 黄色一级a毛片| 自拍视频在线观看一区二区| 成人性视频欧美一区二区三区| av不卡一区二区| 麻豆乱码国产一区二区三区 | 搜索黄色一级片| 日韩精品成人一区二区在线| 国产日韩欧美精品| 在线午夜影院| 91精品国产欧美日韩| 蜜桃av乱码一区二区三区| 99精品国产99久久久久久福利| 91香蕉国产在线观看| 午夜免费播放观看在线视频| 91高清视频免费看| 久久精品老司机| 国产精品一区毛片| 国产一区二区在线网站| 午夜dj在线观看高清视频完整版 | 一区二区三区高清| 激情在线观看视频| 欧美肥老太太性生活| 国产精品免费在线免费| 触手亚洲一区二区三区| 91久久精品一区二区二区| 久久精品成人av| 欧美中文日韩| 欧美一区二区视频17c| 亚洲精品88| 亚洲欧美激情精品一区二区| 日韩人妻精品中文字幕| 久久久久久久网| 任你操这里只有精品| 九色成人国产蝌蚪91| 日韩av免费看| av在线之家电影网站| 欧美性大战久久久| 少妇视频一区二区| 激情综合色播激情啊| 国产又粗又大又爽的视频| 日本精品视频| 国模精品一区二区三区色天香| 日本久久一级片| 黑人精品xxx一区| 妺妺窝人体色WWW精品| 久久精品官网| 亚洲欧美日韩精品在线| 中文字幕成人| 午夜精品99久久免费| 少妇性bbb搡bbb爽爽爽欧美| 欧美性猛片xxxx免费看久爱| 久久精品日韩无码| 国产.精品.日韩.另类.中文.在线.播放| 欧美黄色免费网址| 加勒比色综合久久久久久久久| 777精品视频| av在线免费一区| 日韩欧美精品在线视频| 日本一区二区三区四区五区| 久久久综合网站| 日本美女视频一区| 精品9999| 亚洲电影一二三区| a看欧美黄色女同性恋| 欧美诱惑福利视频| 欧美私人网站| 亚洲韩国欧洲国产日产av| 国产日韩久久久| 亚洲在线成人精品| 黄色av免费播放| 国产成人综合亚洲网站| 99精品视频播放| 欧美午夜a级限制福利片| 欧美日韩国产精品一卡| 亚洲欧美久久精品| 欧洲s码亚洲m码精品一区| 免费在线看黄色| 精品亚洲国产成av人片传媒 | 亚洲国产小视频| 伊人成人在线观看| 天天操天天综合网| 日本一级片免费| 国产欧美一区二区三区在线看蜜臀 | 欧美美女日韩| 欧美剧在线观看| 国产高清免费av在线| 欧美精品一区二区三区四区| 艳妇乳肉豪妇荡乳av| 偷拍亚洲欧洲综合| 无码人妻精品一区二区三区夜夜嗨| 91看片淫黄大片一级在线观看| 一区二区三区国产好的精华液| 香蕉亚洲视频| 久久久久久免费看| 亚洲综合激情在线| 亚洲激情一区二区三区| 色老板在线视频一区二区| 91九色偷拍| 小说区图片区亚洲| 热门国产精品亚洲第一区在线| 波多野结衣久久| 久久久成人的性感天堂| eeuss影院在线播放| 日韩精品福利网站| 国产 欧美 精品| 日韩免费在线观看| 国产精品久久久久毛片| 欧美日韩久久久一区| 高潮毛片又色又爽免费 | 欧美激情精品久久久久久黑人| 在线观看av黄网站永久| 国产一级揄自揄精品视频| 午夜性色福利视频| 亚洲国产毛片完整版| www日本高清| 91精品国产91久久久久久一区二区 | 亚洲黄网站黄| 草草视频在线免费观看| 很黄很黄激情成人| 欧美乱大交xxxxx潮喷l头像| 欧美精品一卡| av在线com| 激情久久久久| 欧洲精品一区二区三区久久| 影音先锋久久| 欧美二区在线视频| 亚洲欧美卡通另类91av| 久久久一本二本三本| 国产农村妇女精品一区二区| 777精品久无码人妻蜜桃| 99国产成+人+综合+亚洲欧美| 国产 日韩 欧美在线| 在线国产日韩| 六月丁香婷婷激情| 久久亚洲视频| 国产野外作爱视频播放| 精品影视av免费| 天天做天天干天天操| 国产精品夜夜嗨| 日批在线观看视频| 久久免费精品国产久精品久久久久| 一级做a爰片毛片| 国产亚洲精久久久久久| 亚洲一二三四五六区| 亚洲色图欧美激情| 久久精品性爱视频| 色哟哟精品一区| 亚洲天堂中文字幕在线| 欧美一级xxx| 午夜福利理论片在线观看| 亚洲欧洲日本专区| 久久综合网导航| 97精品国产91久久久久久| 国产精品伦理| 91久久精品美女| 日韩成人av在线资源| 日韩一区不卡| 中文字幕亚洲精品乱码| 每日在线观看av| 免费高清视频精品| 杨幂一区二区国产精品| 91麻豆国产精品久久| 战狼4完整免费观看在线播放版| 一区二区三区精品在线观看| 成人午夜淫片100集| 欧美疯狂性受xxxxx喷水图片| 国产综合无码一区二区色蜜蜜| 亚洲男人天堂2019| 成人福利片网站| 91av在线播放| 成人激情久久| 精品无人区一区二区三区| 日韩电影免费在线观看| 免费视频爱爱太爽了| 天堂va蜜桃一区二区三区漫画版| 国产91在线免费观看| 国产欧美一区二区精品忘忧草| 欧美人妻精品一区二区免费看| 狠狠做深爱婷婷久久综合一区| 91中文字幕在线播放| 日韩av在线天堂网| av在线官网| 国产精品日韩久久久久| 国产色噜噜噜91在线精品| 一本一道久久a久久综合精品 | 蜜桃视频欧美| 日韩人妻一区二区三区蜜桃视频| 久久蜜桃精品| av漫画在线观看| 亚洲欧美在线高清| 人人妻人人爽人人澡人人精品 | 色猫猫国产区一区二在线视频| 精品久久人妻av中文字幕| 国产一区二区动漫| 亚洲私拍视频| 国产视频精品网| 永久91嫩草亚洲精品人人| 88av.com| 久久久久久久一区| 影音先锋亚洲天堂| 亚洲精品一区二区三区香蕉| 麻豆传媒视频在线观看| 国产精品高清网站| 国产不卡一区| 国产午夜大地久久| 成人免费视频视频在线观看免费 | 日韩欧美在线网址 | 视频一区日韩精品| 欧美aaa在线观看| 免费不卡在线视频| 成人无码av片在线观看| 欧美在线免费观看亚洲| 免费一级毛片在线观看| 91chinesevideo永久地址| 国产suv精品一区二区四区视频| 国产精品视频一二三四区| 国产精品一品视频| 久久久精品一区二区涩爱| 日韩欧美三级在线| 日韩123区| 国产 高清 精品 在线 a| 国产精品99一区二区| 永久免费看片在线观看| 亚洲综合成人在线视频| 亚洲精品.www| 国内成人精品视频| 日韩一级电影| av网址在线观看免费| 国产日韩欧美a| 中文字幕乱码一区二区| 最近的2019中文字幕免费一页| h1515四虎成人| 在线视频不卡国产| 国产精品一区二区在线观看不卡 | 伊人狠狠色j香婷婷综合| 在线免费看黄色片| 午夜精品在线看| 欧美日韩影视| 国产精品天天狠天天看| 亚洲v在线看| 俄罗斯黄色录像| 欧美午夜影院在线视频| av福利精品| 91精品天堂| 久久大逼视频| 中文字幕观看av| 亚洲精品在线免费观看视频| 在线精品亚洲欧美日韩国产| 日韩欧美手机在线| 国产一区二区三区久久久| 国产一级二级三级| 日韩精品丝袜在线| 欧美视频精品| 人人妻人人澡人人爽欧美一区| 93久久精品日日躁夜夜躁欧美| 久久久久久无码精品大片| 久久精品成人欧美大片| 久久久久观看| 亚洲欧美国产日韩综合| 亚洲已满18点击进入久久| 免费动漫网站在线观看| 成人看片人aa| 一区二区三区精品视频在线观看 | 91亚洲一区| 亚洲啪av永久无码精品放毛片| 色成人在线视频| 色噜噜狠狠狠综合欧洲色8| 欧美一区视久久| 国产成人av电影在线播放| 中文字幕在线播| 久操成人在线视频| 精品产国自在拍| 色欲欲www成人网站| 日本久久一区二区三区| 搞黄网站在线看| 一区二区三区欧美在线| 99国产精品久| www.久久色| 国产欧美精品日韩| 国产精品视区| 久久久综合久久久| 日韩在线观看免费全| 美女亚洲一区|