精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kafka消費與心跳機制

大數據 Kafka
最近有同學咨詢Kafka的消費和心跳機制,今天筆者將通過這篇博客來逐一介紹這些內容。

1.概述

最近有同學咨詢Kafka的消費和心跳機制,今天筆者將通過這篇博客來逐一介紹這些內容。

2.內容

2.1 Kafka消費

首先,我們來看看消費。Kafka提供了非常簡單的消費API,使用者只需初始化Kafka的Broker Server地址,然后實例化KafkaConsumer類即可拿到Topic中的數據。一個簡單的Kafka消費實例代碼如下所示:

  1. public class JConsumerSubscribe extends Thread { 
  2.     public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers""dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址 
  3.         props.put("group.id""ke");// 指定消費者組 
  4.         props.put("enable.auto.commit""true");// 開啟自動提交 
  5.         props.put("auto.commit.interval.ms""1000");// 自動提交的時間間隔 
  6.         // 反序列化消息主鍵        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  7.         // 反序列化消費記錄        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  8.         return props; 
  9.     }    /** 實現一個單線程消費者. */    @Override    public void run() {        // 創建一個消費者實例對象        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());        // 訂閱消費主題集合        consumer.subscribe(Arrays.asList("test_kafka_topic")); 
  10.         // 實時消費標識        boolean flag = true
  11.         while (flag) { 
  12.             // 獲取主題消息數據            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); 
  13.             for (ConsumerRecord<String, String> record : records) 
  14.                 // 循環打印消息記錄                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
  15.         }        // 出現異常關閉消費者對象        consumer.close(); 
  16.     }} 

上述代碼我們就可以非常便捷的拿到Topic中的數據。但是,當我們調用poll方法拉取數據的時候,Kafka Broker Server做了那些事情。接下來,我們可以去看看源代碼的實現細節。核心代碼如下:

org.apache.kafka.clients.consumer.KafkaConsumer

  1. private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { 
  2.         acquireAndEnsureOpen();        try { 
  3.             if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative"); 
  4.             if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { 
  5.                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); 
  6.             }            // poll for new data until the timeout expires 
  7.             long elapsedTime = 0L; 
  8.             do { 
  9.                 client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) { 
  10.                     final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { 
  11.                         return ConsumerRecords.empty(); 
  12.                     }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } else { 
  13.                     while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { 
  14.                         log.warn("Still waiting for metadata"); 
  15.                     }                    metadataEnd = time.milliseconds();                }                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) { 
  16.                     // before returning the fetched records, we can send off the next round of fetches 
  17.                     // and avoid block waiting for their responses to enable pipelining while the user 
  18.                     // is handling the fetched records. 
  19.                     // 
  20.                     // NOTE: since the consumed position has already been updated, we must not allow 
  21.                     // wakeups or any other errors to be triggered prior to returning the fetched records. 
  22.                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { 
  23.                         client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records)); 
  24.                 }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime < timeoutMs); 
  25.             return ConsumerRecords.empty(); 
  26.         } finally { 
  27.             release();        }    } 

上述代碼中有個方法pollForFetches,它的實現邏輯如下:

  1. private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { 
  2.         final long startMs = time.milliseconds(); 
  3.         long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); 
  4.         // if data is available already, return it immediately 
  5.         final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); 
  6.         if (!records.isEmpty()) { 
  7.             return records; 
  8.         } 
  9.         // send any new fetches (won't resend pending fetches) 
  10.         fetcher.sendFetches(); 
  11.         // We do not want to be stuck blocking in poll if we are missing some positions 
  12.         // since the offset lookup may be backing off after a failure 
  13.         // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call 
  14.         // updateAssignmentMetadataIfNeeded before this method. 
  15.         if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { 
  16.             pollTimeout = retryBackoffMs; 
  17.         } 
  18.         client.poll(pollTimeout, startMs, () -> { 
  19.             // since a fetch might be completed by the background thread, we need this poll condition 
  20.             // to ensure that we do not block unnecessarily in poll() 
  21.             return !fetcher.hasCompletedFetches(); 
  22.         }); 
  23.         // after the long poll, we should check whether the group needs to rebalance 
  24.         // prior to returning data so that the group can stabilize faster 
  25.         if (coordinator.rejoinNeededOrPending()) { 
  26.             return Collections.emptyMap(); 
  27.         } 
  28.         return fetcher.fetchedRecords(); 
  29.     } 

上述代碼中加粗的位置,我們可以看出每次消費者客戶端拉取數據時,通過poll方法,先調用fetcher中的fetchedRecords函數,如果獲取不到數據,就會發起一個新的sendFetches請求。而在消費數據的時候,每個批次從Kafka Broker Server中拉取數據是有最大數據量限制,默認是500條,由屬性(max.poll.records)控制,可以在客戶端中設置該屬性值來調整我們消費時每次拉取數據的量。

提示:這里需要注意的是,max.poll.records返回的是一個poll請求的數據總和,與多少個分區無關。因此,每次消費從所有分區中拉取Topic的數據的總條數不會超過max.poll.records所設置的值。

而在Fetcher的類中,在sendFetches方法中有限制拉取數據容量的限制,由屬性(max.partition.fetch.bytes),默認1MB。可能會有這樣一個場景,當滿足max.partition.fetch.bytes限制條件,如果需要Fetch出10000條記錄,每次默認500條,那么我們需要執行20次才能將這一次通過網絡發起的請求全部Fetch完畢。

這里,可能有同學有疑問,我們不能將默認的max.poll.records屬性值調到10000嗎?可以調,但是還有個屬性需要一起配合才可以,這個就是每次poll的超時時間(Duration.ofMillis(100)),這里需要根據你的實際每條數據的容量大小來確定設置超時時間,如果你將最大值調到10000,當你每條記錄的容量很大時,超時時間還是100ms,那么可能拉取的數據少于10000條。

而這里,還有另外一個需要注意的事情,就是會話超時的問題。session.timeout.ms默認是10s,group.min.session.timeout.ms默認是6s,group.max.session.timeout.ms默認是30min。當你在處理消費的業務邏輯的時候,如果在10s內沒有處理完,那么消費者客戶端就會與Kafka Broker Server斷開,消費掉的數據,產生的offset就沒法提交給Kafka,因為Kafka Broker Server此時認為該消費者程序已經斷開,而即使你設置了自動提交屬性,或者設置auto.offset.reset屬性,你消費的時候還是會出現重復消費的情況,這就是因為session.timeout.ms超時的原因導致的。

2.2 心跳機制

上面在末尾的時候,說到會話超時的情況導致消息重復消費,為什么會有超時?有同學會有這樣的疑問,我的消費者線程明明是啟動的,也沒有退出,為啥消費不到Kafka的消息呢?消費者組也查不到我的ConsumerGroupID呢?這就有可能是超時導致的,而Kafka是通過心跳機制來控制超時,心跳機制對于消費者客戶端來說是無感的,它是一個異步線程,當我們啟動一個消費者實例時,心跳線程就開始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中會啟動一個HeartbeatThread線程來定時發送心跳和檢測消費者的狀態。每個消費者都有個org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每個ConsumerCoordinator都會啟動一個HeartbeatThread線程來維護心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,聲明的Schema如下所示:

  1. private final int sessionTimeoutMs; 
  2.     private final int heartbeatIntervalMs; 
  3.     private final int maxPollIntervalMs; 
  4.     private final long retryBackoffMs; 
  5.     private volatile long lastHeartbeatSend;  
  6.     private long lastHeartbeatReceive; 
  7.     private long lastSessionReset; 
  8.     private long lastPoll; 
  9.     private boolean heartbeatFailed; 

心跳線程中的run方法實現代碼如下:

  1. public void run() { 
  2.             try { 
  3.                 log.debug("Heartbeat thread started"); 
  4.                 while (true) { 
  5.                     synchronized (AbstractCoordinator.this) { 
  6.                         if (closed) 
  7.                             return
  8.                         if (!enabled) { 
  9.                             AbstractCoordinator.this.wait(); 
  10.                             continue
  11.                         }                        if (state != MemberState.STABLE) { 
  12.                             // the group is not stable (perhaps because we left the group or because the coordinator 
  13.                             // kicked us out), so disable heartbeats and wait for the main thread to rejoin. 
  14.                             disable(); 
  15.                             continue
  16.                         } 
  17.                         client.pollNoWakeup(); 
  18.                         long now = time.milliseconds(); 
  19.                         if (coordinatorUnknown()) { 
  20.                             if (findCoordinatorFuture != null || lookupCoordinator().failed()) 
  21.                                 // the immediate future check ensures that we backoff properly in the case that no 
  22.                                 // brokers are available to connect to
  23.                                 AbstractCoordinator.this.wait(retryBackoffMs); 
  24.                         } else if (heartbeat.sessionTimeoutExpired(now)) { 
  25.                             // the session timeout has expired without seeing a successful heartbeat, so we should 
  26.                             // probably make sure the coordinator is still healthy. 
  27.                             markCoordinatorUnknown(); 
  28.                         } else if (heartbeat.pollTimeoutExpired(now)) { 
  29.                             // the poll timeout has expired, which means that the foreground thread has stalled 
  30.                             // in between calls to poll(), so we explicitly leave the group
  31.                             maybeLeaveGroup(); 
  32.                         } else if (!heartbeat.shouldHeartbeat(now)) { 
  33.                             // poll again after waiting for the retry backoff in case the heartbeat failed or the 
  34.                             // coordinator disconnected 
  35.                             AbstractCoordinator.this.wait(retryBackoffMs); 
  36.                         } else { 
  37.                             heartbeat.sentHeartbeat(now); 
  38.                             sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() { 
  39.                                 @Override 
  40.                                 public void onSuccess(Void value) { 
  41.                                     synchronized (AbstractCoordinator.this) { 
  42.                                         heartbeat.receiveHeartbeat(time.milliseconds()); 
  43.                                     } 
  44.                                 } 
  45.                                 @Override 
  46.                                 public void onFailure(RuntimeException e) { 
  47.                                     synchronized (AbstractCoordinator.this) { 
  48.                                         if (e instanceof RebalanceInProgressException) { 
  49.                                             // it is valid to continue heartbeating while the group is rebalancing. This 
  50.                                             // ensures that the coordinator keeps the member in the group for as long 
  51.                                             // as the duration of the rebalance timeout. If we stop sending heartbeats, 
  52.                                             // however, then the session timeout may expire before we can rejoin. 
  53.                                             heartbeat.receiveHeartbeat(time.milliseconds()); 
  54.                                         } else { 
  55.                                             heartbeat.failHeartbeat(); 
  56.                                             // wake up the thread if it's sleeping to reschedule the heartbeat 
  57.                                             AbstractCoordinator.this.notify(); 
  58.                                         } 
  59.                                     } 
  60.                                 } 
  61.                             }); 
  62.                         } 
  63.                     } 
  64.                 } 
  65.             } catch (AuthenticationException e) { 
  66.                 log.error("An authentication error occurred in the heartbeat thread", e); 
  67.                 this.failed.set(e); 
  68.             } catch (GroupAuthorizationException e) { 
  69.                 log.error("A group authorization error occurred in the heartbeat thread", e); 
  70.                 this.failed.set(e); 
  71.             } catch (InterruptedException | InterruptException e) { 
  72.                 Thread.interrupted(); 
  73.                 log.error("Unexpected interrupt received in heartbeat thread", e); 
  74.                 this.failed.set(new RuntimeException(e)); 
  75.             } catch (Throwable e) { 
  76.                 log.error("Heartbeat thread failed due to unexpected error", e); 
  77.                 if (e instanceof RuntimeException) 
  78.                     this.failed.set((RuntimeException) e); 
  79.                 else 
  80.                     this.failed.set(new RuntimeException(e)); 
  81.             } finally { 
  82.                 log.debug("Heartbeat thread has closed"); 
  83.             } 
  84.         } 
  85. View Code 

在心跳線程中這里面包含兩個最重要的超時函數,它們是sessionTimeoutExpired和pollTimeoutExpired。

  1. public boolean sessionTimeoutExpired(long now) { 
  2.         return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs; 
  3. }public boolean pollTimeoutExpired(long now) { 
  4.         return now - lastPoll > maxPollIntervalMs; 

2.2.1 sessionTimeoutExpired

如果是sessionTimeout超時,則會被標記為當前協調器處理斷開,此時,會將消費者移除,重新分配分區和消費者的對應關系。在Kafka Broker Server中,Consumer Group定義了5中(如果算上Unknown,應該是6種狀態)狀態,org.apache.kafka.common.ConsumerGroupState,如下圖所示:

 

Kafka消費與心跳機制

2.2.2 pollTimeoutExpired

如果觸發了poll超時,此時消費者客戶端會退出ConsumerGroup,當再次poll的時候,會重新加入到ConsumerGroup,觸發RebalanceGroup。而KafkaConsumer Client是不會幫我們重復poll的,需要我們自己在實現的消費邏輯中不停的調用poll方法。

3.分區與消費線程

關于消費分區與消費線程的對應關系,理論上消費線程數應該小于等于分區數。之前是有這樣一種觀點,一個消費線程對應一個分區,當消費線程等于分區數是最大化線程的利用率。直接使用KafkaConsumer Client實例,這樣使用確實沒有什么問題。但是,如果我們有富裕的CPU,其實還可以使用大于分區數的線程,來提升消費能力,這就需要我們對KafkaConsumer Client實例進行改造,實現消費策略預計算,利用額外的CPU開啟更多的線程,來實現消費任務分片。具體實現,留到下一篇博客,給大家分享《基于Kafka的分布式查詢SQL引擎》。

4.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

責任編輯:未麗燕 來源: 哥不是小蘿莉
相關推薦

2020-11-13 10:58:24

Kafka

2012-05-31 02:54:07

HadoopJava

2022-06-20 19:39:31

微服務registry通信

2020-10-15 18:31:36

理解Netty編解碼

2024-03-20 08:33:00

Kafka線程安全Rebalance

2024-06-26 12:45:00

2024-12-23 06:00:00

TCPC#網絡

2023-12-11 07:12:21

心跳檢測重連機制服務端

2024-03-19 11:41:12

2021-12-28 12:01:59

Kafka 消費者機制

2023-01-14 17:36:39

微服務注冊中心數據

2022-03-07 10:15:28

KafkaZookeeper存儲

2023-06-01 08:08:38

kafka消費者分區策略

2025-02-07 00:14:03

2024-06-13 15:26:23

2024-05-29 07:50:41

2019-11-19 14:48:00

Kafka文件存儲

2023-06-07 15:25:19

Kafka版本日志

2022-11-14 08:19:59

重試機制Kafka

2024-08-13 15:46:57

點贊
收藏

51CTO技術棧公眾號

亚洲一区二区色| 国产 中文 字幕 日韩 在线| 日日夜夜精品一区| 国产精品99久久久久久宅男| 国a精品视频大全| 欧美性xxxx图片| 日韩一区二区三免费高清在线观看| 亚洲精品中文字幕乱码三区| 久久99精品国产一区二区三区| 国产精品xxxxxx| 国产精品第十页| 国产亚洲aⅴaaaaaa毛片| 中文字幕日韩久久| 免费成人直播| 亚洲最大的成人av| 手机在线观看国产精品| 亚洲AV无码一区二区三区性| 奇米色一区二区三区四区| 欧美激情二区三区| 中文字幕第69页| 日韩中文av| 日韩女优毛片在线| 欧美第一页浮力影院| 操人在线观看| 亚洲黄色小说网站| 亚洲欧洲免费无码| 久久经典视频| fc2成人免费人成在线观看播放| 国产精品国产福利国产秒拍| 国产成年人免费视频| 欧美综合另类| 亚洲免费精彩视频| 男男做爰猛烈叫床爽爽小说 | 99精彩视频在线观看免费| 这里只有精品免费视频| 性色av一区二区怡红| 欧美另类在线播放| 老司机成人免费视频| 人人狠狠综合久久亚洲婷| 亚洲精选在线观看| 亚洲蜜桃精久久久久久久久久久久| 精品成人18| 4438x成人网最大色成网站| 北条麻妃视频在线| 3d性欧美动漫精品xxxx软件| 精品福利在线看| www.日本在线视频| 亚洲wwwww| 亚洲欧美日韩国产手机在线 | 欧美日韩亚洲国产综合| 国产xxxxx视频| 极品美女一区| 日韩欧美一区二区三区久久| 日韩欧美视频网站| 中文字幕在线看片| 欧美日韩在线另类| 亚洲精品乱码久久久久久自慰| 美女的胸无遮挡在线观看 | 精品激情国产视频| 999久久久国产| 99精品一区| 久久亚洲精品国产亚洲老地址| 人人澡人人澡人人看| 久久久久久久久99精品大| 不卡毛片在线看| 久久成人国产精品入口| 激情综合中文娱乐网| 久久免费视频网站| 伊人手机在线视频| 秋霞电影一区二区| 亚洲一区二区在线| 日韩一级片免费| 91亚洲男人天堂| 日韩精品国内| 理论片午午伦夜理片在线播放| 综合色中文字幕| 国产一区二区四区| 美女扒开腿让男人桶爽久久软| 日本大香伊一区二区三区| 天天爽天天爽夜夜爽| 国产一区二区高清在线| 欧美mv日韩mv国产网站| 三上悠亚ssⅰn939无码播放 | www.午夜色| 欧美寡妇性猛交xxx免费| 亚洲一区免费在线观看| 妺妺窝人体色www在线小说| 成人va天堂| 日韩一区二区免费在线电影| 亚洲中文字幕一区| 成人免费在线播放| 欧美高清在线视频观看不卡| 五月婷婷中文字幕| 久久精品免费看| 国产女人水真多18毛片18精品 | 蜜臀久久99精品久久久酒店新书 | 日精品一区二区| 91精品视频一区| 无码国产色欲xxxx视频| 国产精品你懂的在线| 国产片侵犯亲女视频播放| 成人性生交大片免费观看网站| 欧美精品一二三| 国产激情视频网站| 国产精品成人一区二区不卡| 88国产精品欧美一区二区三区| 中文在线观看免费高清| 99视频有精品| 日本xxx免费| 欧美羞羞视频| 精品国产不卡一区二区三区| 美国美女黄色片| 亚洲一区二区三区免费在线观看| 成人伊人精品色xxxx视频| 欧美日韩在线中文字幕| 一区二区三区在线不卡| 欧美美女一级片| 欧美美女在线| 97超级碰碰人国产在线观看| 99精品在线看| 欧美激情一区二区在线| 九九九九免费视频| 高清日韩欧美| 久久99精品视频一区97| 亚洲天堂久久久久| 国产亚洲人成网站| 一女被多男玩喷潮视频| 丁香婷婷成人| 欧美成人精品xxx| 97国产精品久久久| 欧美韩国一区二区| 中文字幕在线导航| 精品无人区麻豆乱码久久久| 欧美亚洲国产日韩2020| 色呦呦免费观看| 亚洲午夜电影在线| 国产成人精品一区二区三区在线观看| 国产精品毛片久久| 国产精品综合网站| 自拍视频在线免费观看| 欧美亚洲综合色| 欧美黄色高清视频| 青青草原综合久久大伊人精品优势| 久久av一区二区| 超碰成人av| 亚洲第一精品电影| 国产一卡二卡在线| 波多野结衣亚洲一区| 欧美图片激情小说| 任你躁在线精品免费| 91tv亚洲精品香蕉国产一区7ujn| 免费国产黄色片| 午夜av电影一区| 国产艳俗歌舞表演hd| 乱人伦精品视频在线观看| 久久艳妇乳肉豪妇荡乳av| 欧美成人ⅴideosxxxxx| 亚洲天堂成人在线视频| 这里只有精品国产| 亚洲三级免费观看| 国产精品91av| 国产日产高清欧美一区二区三区| 久久久com| 电影天堂国产精品| 久久久国产精彩视频美女艺术照福利| 国产喷水福利在线视频| 亚洲一区二区av在线| 波多野结衣视频播放| 久久高清免费观看| 性欧美大战久久久久久久免费观看| 日韩成人免费av| 久久99热精品| 日本一区高清| 欧美日韩国产综合一区二区 | 久久久久国产精品嫩草影院| 欧美性大战久久久久久久| 国产福利视频网站| bt7086福利一区国产| 国产一级不卡毛片| 欧美不卡视频| 免费久久久一本精品久久区| 黄色成人小视频| 欧美肥老妇视频| 极品美乳网红视频免费在线观看| 欧美日韩五月天| 国产亚洲精品码| 国产午夜精品福利| 日本成人在线免费| 日韩专区一卡二卡| 青青草综合视频| 久久超碰99| 97影院在线午夜| 婷婷激情一区| 欧美激情视频一区| 91福利在线视频| 亚洲国产91色在线| 在线免费观看高清视频| 亚洲一区二区三区四区五区中文| av电影在线不卡| 粉嫩在线一区二区三区视频| 欧美成人黄色网址| 亚洲人成高清| 精品91一区二区三区| 同性恋视频一区| 俄罗斯精品一区二区三区| 日本欧美韩国| 91成人在线观看国产| 超碰最新在线| 一个色综合导航| 五月天婷婷激情网| 欧美大片日本大片免费观看| 中文字幕 视频一区| 午夜精品久久久久久久| 夫妻性生活毛片| 日本一区二区免费在线| 北岛玲一区二区| 成人一区二区视频| 日本亚洲一区二区三区| 日本va欧美va精品发布| 日本三级免费观看| 黄色在线成人| 影音先锋男人的网站| 欧美国产美女| 日韩精品欧美专区| 亚洲欧美成人vr| 国产综合第一页| 亚洲精品一区二区三区中文字幕| 国产日韩精品在线观看| 欧美va视频| 国产精品高精视频免费| 欧美一区 二区 三区| 国产美女三级无套内谢| 免费日韩成人| **欧美日韩vr在线| 欧美黄色视屏| 理论片在线不卡免费观看| av中文天堂在线| 亚洲日韩中文字幕在线播放| 天天爽夜夜爽夜夜爽| 精品国产制服丝袜高跟| 国产欧美一区二区三区视频在线观看| 欧美性xxxxxxxx| 国产又粗又猛又爽又| 一本到高清视频免费精品| 中文字幕第15页| 午夜精品国产更新| 国产一国产二国产三| 亚洲综合一区二区三区| 国产一级视频在线| 亚洲国产日韩a在线播放性色| 久久久精品视频免费| 亚洲午夜电影网| 国产精品一区二区6| 精品国产福利在线| 国产嫩bbwbbw高潮| 在线观看三级视频欧美| 在线观看免费视频a| 欧美精品久久99久久在免费线 | 成人女同在线观看| 国模私拍视频一区| 日本综合字幕| 国产精品第二页| 麻豆久久久久| 91久久久久久久久久| 亚洲国产精品免费视频| 成人在线看片| 卡通动漫精品一区二区三区| 美媛馆国产精品一区二区| 国产剧情一区| 在线亚洲美日韩| 精品av久久久久电影| 亚洲色欲综合一区二区三区| 免费看欧美美女黄的网站| 一级片黄色免费| 99re在线精品| 国产在线免费av| 亚洲图片一区二区| 无码无套少妇毛多18pxxxx| 欧美日韩成人在线| 性色av蜜臀av| 亚洲欧美日韩一区在线| 免费av网站在线看| 午夜精品在线视频| 91在线亚洲| 成人欧美一区二区三区黑人免费| 婷婷国产精品| 一区二区三区四区欧美日韩| 激情视频一区| 免费看污污网站| 波波电影院一区二区三区| jizz中文字幕| 亚洲成精国产精品女| 超碰在线免费97| 亚洲精品一区二区三区99| yourporn在线观看视频| 欧美激情乱人伦| 成人做爰免费视频免费看| 国产精品二区三区| 欧美高清在线| 欧美精品色婷婷五月综合| 国产综合久久久久影院| 精品欧美一区二区久久久| 一区二区三区久久久| 中文字幕免费播放| 亚洲精品二三区| av在线播放免费| 久久久女女女女999久久| 日韩大陆av| 日韩黄色影视| 国产精品美女久久久| 男生和女生一起差差差视频| 久久久五月婷婷| 国产精品自拍视频一区| 在线播放一区二区三区| 国产片在线观看| 97超级碰碰人国产在线观看| 91欧美极品| 日韩不卡一二区| 麻豆精品一区二区综合av| 免费在线观看你懂的| 黄网动漫久久久| 国产成人三级在线观看视频| 欧美wwwxxxx| 国产一区二区三区亚洲综合| 色婷婷精品国产一区二区三区| 香蕉久久久久久久av网站| 国产一线在线观看| 一区二区三区蜜桃| 国产99999| 久久成年人视频| 日日夜夜亚洲| 黄瓜视频免费观看在线观看www| 久久亚洲风情| 37p粉嫩大胆色噜噜噜| 午夜精品国产更新| 亚洲日本中文字幕在线| 97色在线观看| 麻豆一区一区三区四区| 黄色大片中文字幕| 成人午夜碰碰视频| 久久免费视频精品| 精品国产乱码久久| 操喷在线视频| 国内一区在线| 久久免费黄色| 人妻aⅴ无码一区二区三区| 在线一区二区观看| 岛国视频免费在线观看| 国产精品成人一区二区三区吃奶| 国产区精品区| 污污动漫在线观看| 亚洲色图欧美偷拍| 性一交一乱一精一晶| 国内精品视频一区| 偷拍自拍一区| 激情网站五月天| 国产精品青草综合久久久久99| 亚洲免费视频二区| 精品国产欧美一区二区五十路 | 欧美日韩综合精品| 日本成人中文字幕| 九九热最新地址| 亚洲国产成人91精品| 国产精欧美一区二区三区蓝颜男同| 欧美在线一区二区三区四区| 免费黄网站欧美| 九九免费精品视频| 亚洲男人第一网站| 四虎成人精品一区二区免费网站| 久久国产精品免费观看| youjizz国产精品| 欧美一区二区三区网站| www.久久色.com| 国产精品网站在线看| 成人一区二区三| 亚洲美腿欧美偷拍| 天堂a√中文在线| 国产美女搞久久| 亚洲精品免费观看| 9.1片黄在线观看| 精品国产免费一区二区三区香蕉| 中文字幕影音在线| 97超碰免费观看| 91丨porny丨首页| 国产精品无码久久av| 91国在线精品国内播放| 手机在线一区二区三区| 精品人妻一区二区免费视频| 欧美日韩视频一区二区| av资源一区| 一级黄色免费在线观看| 91蝌蚪国产九色| 国产a级免费视频| 国产精品激情av电影在线观看 | 欧美激情在线狂野欧美精品| 亚洲第一福利社区| 永久免费看片在线观看| 欧美在线视频你懂得| av在线最新| 影音先锋男人的网站|