精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Springboot整合Kafka Stream實時統計數據

開發 前端 Kafka
Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數據。它建立在流式處理的一些重要的概念之上:如何區分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應用程序狀態。

[[417927]]

環境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介紹

Kafka在0.10版本推出了Stream API,提供了對存儲在Kafka內的數據進行流式處理和分析的能力。

流式計算一般被用來和批量計算做比較。批量計算往往有一個固定的數據集作為輸入并計算結果。而流式計算的輸入往往是“無界”的(Unbounded Data),持續輸入的,即永遠拿不到全量數據去做計算;同時,計算結果也是持續輸出的,只能拿到某一個時刻的結果,而不是最終的結果。

Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數據。它建立在流式處理的一些重要的概念之上:如何區分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應用程序狀態。

Kafka Streams的門檻非常低:和編寫一個普通的Kafka消息處理程序沒有太大的差異,可以通過多進程部署來完成擴容、負載均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特點:

  • 被設計成一個簡單的、輕量級的客戶端類庫,能夠被集成到任何Java應用中
  • 除了Kafka之外沒有任何額外的依賴,利用Kafka的分區模型支持水平擴容和保證順序性
  • 通過可容錯的狀態存儲實現高效的狀態操作(windowed joins and aggregations)
  • 支持exactly-once語義
  • 支持紀錄級的處理,實現毫秒級的延遲
  • 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流處理拓撲

  • 流是Kafka Streams提供的最重要的抽象:它表示一個無限的、不斷更新的數據集。流是不可變數據記錄的有序、可重放和容錯序列,其中數據記錄定義為鍵值對。
  • Stream Processing Application是使用了Kafka Streams庫的應用程序。它通過processor topologies定義計算邏輯,其中每個processor topology都是多個stream processor(節點)通過stream組成的圖。
  • A stream processor 是處理器拓撲中的節點;它表示一個處理步驟,通過每次從拓撲中的上游處理器接收一個輸入記錄,將其操作應用于該記錄,來轉換流中的數據,并且隨后可以向其下游處理器生成一個或多個輸出記錄。

有兩種特殊的processor:

Source Processor 源處理器是一種特殊類型的流處理器,它沒有任何上游處理器。它通過使用來自一個或多個kafka topic的記錄并將其轉發到其下游處理器,從而從一個或多個kafka topic生成其拓撲的輸入流。

Sink Processor 接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收到的任何記錄發送到指定的kafka topic。

Springboot整合Kafka Stream實時統計數據

相關的核心概念查看如下鏈接

Springboot整合Kafka Stream實時統計數據

下面演示Kafka Stream 在Springboot中的應用

依賴

  1. <dependency> 
  2.   <groupId>org.springframework.boot</groupId> 
  3.   <artifactId>spring-boot-starter-web</artifactId> 
  4.   </dependency> 
  5. <dependency> 
  6.   <groupId>org.springframework.kafka</groupId> 
  7.   <artifactId>spring-kafka</artifactId> 
  8. </dependency> 
  9. <dependency> 
  10.   <groupId>org.apache.kafka</groupId> 
  11.   <artifactId>kafka-streams</artifactId> 
  12. </dependency> 

配置

  1. server: 
  2.   port: 9090 
  3. spring: 
  4.   application: 
  5.     name: kafka-demo 
  6.   kafka: 
  7.     streams: 
  8.       application-id: ${spring.application.name
  9.       properties: 
  10.         spring.json.trusted.packages: '*' 
  11.     bootstrap-servers: 
  12.     - localhost:9092 
  13.     - localhost:9093 
  14.     - localhost:9094 
  15.     producer: 
  16.       acks: 1 
  17.       retries: 10 
  18.       key-serializer: org.apache.kafka.common.serialization.StringSerializer 
  19.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer 
  20.       properties: 
  21.         spring.json.trusted.packages: '*' 
  22.     consumer: 
  23.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
  24.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer 
  25.       enable-auto-commitfalse 
  26.       group-id: ConsumerTest 
  27.       auto-offset-reset: latest 
  28.       properties: 
  29.         session.timeout.ms: 12000 
  30.         heartbeat.interval.ms: 3000 
  31.         max.poll.records: 100 
  32.         spring.json.trusted.packages: '*' 
  33.     listener: 
  34.       ack-mode: manual-immediate 
  35.       type: batch 
  36.       concurrency: 8 
  37.     properties: 
  38.       max.poll.interval.ms: 300000 

消息發送

  1. @Service 
  2. public class MessageSend { 
  3.   @Resource 
  4.   private KafkaTemplate<String, Message> kafkaTemplate ; 
  5.   public void sendMessage2(Message message) { 
  6.     kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> { 
  7.       System.out.println("執行成功..." + Thread.currentThread().getName()) ; 
  8.     }, ex -> { 
  9.       System.out.println("執行失敗") ; 
  10.       ex.printStackTrace() ; 
  11.     }) ; 
  12.   } 

消息監聽

  1. @KafkaListener(topics = {"test"}) 
  2. public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  3.   for (ConsumerRecord<String, Message> record : records) { 
  4.     System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  5.   } 
  6.   try { 
  7.     TimeUnit.SECONDS.sleep(0) ; 
  8.   } catch (InterruptedException e) { 
  9.     e.printStackTrace(); 
  10.   } 
  11.   ack.acknowledge() ; 
  12.      
  13. @KafkaListener(topics = {"demo"}) 
  14. public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  15.   for (ConsumerRecord<String, Message> record : records) { 
  16.     System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  17.   } 
  18.   ack.acknowledge() ; 

Kafka Stream處理

消息轉換并轉發其它Topic

  1. @Bean 
  2. public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) { 
  3.   KStream<Object, Object> stream = streamsBuilder.stream("test"); 
  4.   stream.map((key, value) -> { 
  5.     System.out.println("原始消息內容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; 
  6.     return new KeyValue<>(key"{\"title\": \"123123\", \"message\": \"重新定義內容\"}".getBytes(Charset.forName("UTF-8"))) ; 
  7.   }).to("demo") ; 
  8.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

Stream對象處理

  1. @Bean 
  2. public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.map((key, value) -> { 
  8.     value.setTitle("XXXXXXX") ; 
  9.     return new KeyValue<>(key, value) ; 
  10.   }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; 
  11.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

分組處理

  1. @Bean 
  2. public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .count() 
  15.   .toStream().print(Printed.toSysOut()); 
  16.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

聚合

  1. @Bean 
  2. public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream().print(Printed.toSysOut()); 
  19.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

Filter過濾數據

  1. @Bean 
  2. public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream() 
  19.   .filter((key, value) -> !"2".equals(key)) 
  20.   .print(Printed.toSysOut()); 
  21.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

過濾Key不等于"2"

分支多流處理

  1. @Bean 
  2. public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   // 分支,多流處理 
  8.   KStream<String, Message>[] arrStream = stream.branch( 
  9.     (key, value) -> "男".equals(value.getSex()),  
  10.     (key, value) -> "女".equals(value.getSex())); 
  11.   Stream.of(arrStream).forEach(as -> { 
  12.     as.foreach((key, message) -> { 
  13.       System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; 
  14.     }); 
  15.   }); 
  16.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

多字段分組

不能使用多個selectKey,后面的會覆蓋前面的

  1. @Bean 
  2. public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream 
  8.   .selectKey(new KeyValueMapper<String, Message, String>() { 
  9.     @Override 
  10.     public String apply(String key, Message value) { 
  11.       System.out.println(Thread.currentThread().getName()) ; 
  12.       return value.getTime() + " | " + value.getOrgCode() ; 
  13.     } 
  14.   }) 
  15.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  16.   .count() 
  17.   .toStream().print(Printed.toSysOut()); 
  18.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-04-24 09:01:23

網絡安全數據泄露黑客

2011-10-09 10:33:12

2015-07-29 11:21:13

JavaScript統計數據

2023-07-18 10:43:14

物聯網IOT

2010-04-21 11:27:55

Oracle數據庫

2009-12-24 08:51:08

Linux用戶

2014-12-31 10:56:57

Windows PhoWP

2019-06-27 05:00:26

物聯網統計數據IOT

2022-10-25 09:11:47

物聯網IoT工業物聯網

2018-10-22 06:27:32

網絡犯罪數據泄露攻擊

2022-10-26 15:17:58

數字存儲數據中心

2020-05-09 22:54:48

物聯網安全物聯網IOT

2024-03-06 08:03:09

2022-04-28 07:31:41

Springkafka數據量

2022-06-20 09:49:25

Linux發行版

2019-07-22 05:01:38

物聯網IOT技術

2025-08-04 01:45:00

2022-06-20 14:19:55

FedoraEPELLinux

2021-02-18 16:10:03

物聯網工業4.0人工智能

2022-04-09 11:53:52

供應鏈攻擊
點贊
收藏

51CTO技術棧公眾號

色欲色香天天天综合网www| 欧美一区二区三区四区在线| 午夜久久福利视频| 色操视频在线| 国产亚洲一区二区在线观看| 国产精品美女在线观看| 劲爆欧美第一页| 思热99re视热频这里只精品| 欧美日韩国产一级二级| 欧美做暖暖视频| 深夜福利免费在线观看| 国产在线播放一区| 日本成人在线视频网址| 在线免费观看亚洲视频| 国产精品手机在线播放 | www.在线观看av| 国产尤物视频在线| 成人午夜碰碰视频| 国产精品久久久久不卡| 国产一国产二国产三| 日韩精品午夜| 亚洲精品国精品久久99热一| 在线a免费观看| 成人啊v在线| 精品美女久久久久久免费| 日韩 欧美 自拍| 欧洲免费在线视频| 9人人澡人人爽人人精品| 亚洲在线观看视频| 一本色道久久综合无码人妻| 国产精品视区| 欧美黄色免费网站| 99精品久久久久| 欧美成人激情| 中文精品99久久国产香蕉| 中文字幕一区二区人妻电影丶| 国产精品视频一区二区三区| 欧美日韩在线直播| 波多野结衣天堂| 极品美女一区| 日韩欧美在线视频免费观看| 鲁一鲁一鲁一鲁一色| 欧美aaa免费| 伊人色综合久久天天人手人婷| 一区二区三区免费看| 97超碰人人在线| 国产精品美女久久久久久久久久久| 欧美一区免费视频| 国产一级片在线播放| 久久久精品黄色| 日本视频一区二区不卡| 国模吧精品人体gogo| 久久久久久电影| 美女亚洲精品| 精品电影在线| 欧美极品少妇xxxxⅹ高跟鞋| 欧美lavv| 第一视频专区在线| 1区2区3区精品视频| 欧美 日韩 国产 在线观看| 里番在线观看网站| 一区二区三区四区乱视频| 日韩精品视频在线观看视频 | 亚洲一区在线电影| 国产日本在线播放| 亚洲淫成人影院| 欧美在线制服丝袜| 亚洲欧美日韩网站| 91蝌蚪精品视频| 亚洲精品99久久久久| 国产精品一区二区入口九绯色| 蜜桃视频欧美| 中文精品99久久国产香蕉| 国产97免费视频| 伊人精品视频| 国产成人鲁鲁免费视频a| 中文字幕一二区| 国产经典欧美精品| 久久久com| 福利小视频在线观看| 亚洲欧美日韩在线| 热99这里只有精品| yy6080久久伦理一区二区| 91精品欧美综合在线观看最新| 丰满少妇xbxb毛片日本| 伊甸园亚洲一区| 日韩在线视频观看正片免费网站| 动漫性做爰视频| 免费在线播放第一区高清av| 国产美女精品视频免费观看| 日韩在线视频免费| 国产精品久久久久影院亚瑟| av在线免费观看国产| 国产日韩另类视频一区| 欧美一二三区在线| 欧美激情aaa| 欧美日韩爆操| 国产a∨精品一区二区三区不卡| 国产女人爽到高潮a毛片| 久久综合一区二区| 国内自拍中文字幕| 日本精品不卡| 精品国产乱码久久久久久夜甘婷婷 | 亚洲精品无码久久久久| 国产成人午夜高潮毛片| 色999日韩自偷自拍美女| 成年人视频免费在线播放| 欧美日韩一区二区三区四区| 先锋资源av在线| 亚洲激情中文在线| 国产成人黄色av| 亚洲高清视频在线播放| 中文字幕国产一区二区| 欧美 国产 综合| 91精品国产乱码久久久竹菊| 日韩在线视频网| 亚洲 国产 日韩 欧美| av在线综合网| 97中文字幕在线| 国产精品亚洲四区在线观看| 中文字幕精品av| 精品人妻一区二区三区免费看| 东方aⅴ免费观看久久av| 中文字幕成人一区| 国产成+人+综合+亚洲欧美| 国产视频综合在线| 一区二区三区视频免费看| 国产精品一区三区| 中文字幕第50页| 久久av日韩| 一区二区三区回区在观看免费视频| 自拍偷拍欧美亚洲| 成人激情文学综合网| 日本黄色片一级片| 亚洲国产一区二区三区网站| 久久久av亚洲男天堂| 一级片一区二区三区| 中文天堂在线一区| 亚洲一区二区蜜桃| 精品视频黄色| 国产精品久久久久久久久久尿 | 国产精品久久久久久久久免费丝袜| 久久精品午夜福利| 欧美欧美黄在线二区| 4k岛国日韩精品**专区| 视频二区在线| 色综合久久久久综合| 久操视频免费看| 久久一二三四| 亚洲春色综合另类校园电影| 老司机精品视频网| www.国产精品一二区| 国产精品嫩草影院桃色| 亚洲精品亚洲人成人网在线播放| 中文字幕在线视频一区二区三区 | www.爱色av.com| 一道本一区二区三区| 国产精品久久久久久久久粉嫩av| 在线观看免费黄视频| 在线电影院国产精品| 高h视频免费观看| 成人激情小说乱人伦| 99精品视频播放| 日韩精品第一区| 亚洲一区二区三区sesese| 欧美性爽视频| 精品在线小视频| 一区二区久久精品66国产精品 | 91日韩精品一区| 日韩在线xxx| 99久久久久| 动漫美女被爆操久久久| 欧美aaaaa性bbbbb小妇| 一二美女精品欧洲| 国内精品久久久久久久久久久| 亚洲一区二区影院| av中文字幕免费观看| 久久99久久久久久久久久久| 久久99久久99精品| 成人羞羞视频在线看网址| 91中文字幕在线观看| 欧美裸体视频| 久久久国产成人精品| 日本黄色大片视频| 在线精品亚洲一区二区不卡| 亚洲国产成人精品综合99| xnxx国产精品| 午夜大片在线观看| 久久精品日产第一区二区 | 国产精品嫩草99av在线| 伊人色综合久久天天五月婷| 欧美18xxxx| 成人国产在线激情| 亚洲最大成人| 欧美成人精品在线观看| 国产三级电影在线观看| 精品国产91久久久久久久妲己 | 九色|91porny| 国产亚洲欧美在线视频| 欧美一区高清| 三级三级久久三级久久18| 欧美黄色影院| 91久久国产自产拍夜夜嗨| 欧美暴力调教| 97久久国产精品| aaa大片在线观看| 在线观看欧美视频| 天天干天天草天天射| 91精品午夜视频| 伊人成人在线观看| 日韩欧美精品在线观看| 久久精品国产亚洲AV无码男同| 国产精品乱子久久久久| 播金莲一级淫片aaaaaaa| 成人污视频在线观看| 在线观看免费视频污| 日本不卡一区二区三区高清视频| 日本丰满少妇xxxx| 欧美日韩国产探花| 香蕉视频免费版| 日韩欧美视频专区| 天天久久人人| 成人在线tv视频| http;//www.99re视频| 亚洲精品第一| 国产精品久久在线观看| 二区三区不卡| 欧美在线视频免费| 日韩av影片| 午夜免费在线观看精品视频| 色呦呦在线观看视频| 不卡中文字幕av| 日本a级在线| 日韩一区二区在线视频| 在线观看免费网站黄| 日韩中文字幕在线看| 91美女视频在线| 色妞在线综合亚洲欧美| 天天影视久久综合| 日韩在线视频二区| 国产精品久久久久久福利| 日韩视频免费观看| 黄色av电影在线观看| 欧美成人免费在线观看| 在线电影福利片| 色综合久久中文字幕综合网小说| 97超碰在线公开在线看免费| 久久不射热爱视频精品| 性欧美猛交videos| 久久久久久久91| 国产在线88av| 日本久久久久久久| 国产精品蜜月aⅴ在线| 成人h猎奇视频网站| 亚洲一区二区三区四区电影 | 亚洲国产精品久久久久秋霞影院 | av观看在线| 国内精品久久久久久影视8| 漫画在线观看av| 国产精品久久久久av| 人人精品久久| 风间由美久久久| 色88888久久久久久影院| 日本午夜精品一区二区三区| 日韩欧美伦理| 免费视频爱爱太爽了| 性欧美长视频| 污污网站免费观看| 国产福利一区在线观看| 中文字幕免费高清视频| 国产亚洲精久久久久久| 久草视频手机在线| 亚洲成人777| 超碰在线97观看| 91精品国产免费| 五月婷婷伊人网| 日韩亚洲精品视频| 91在线三级| 国产精品视频在线观看| 中文字幕一区日韩精品| 日韩成人av网站| 国产精品观看| 日本a√在线观看| 国产成人自拍网| 性欧美一区二区| 亚洲一区成人在线| 在线观看你懂的网站| 日韩一卡二卡三卡国产欧美| 污片免费在线观看| 国产欧美精品在线观看| 欧美又粗又大又长| 欧美午夜视频在线观看| 国产精品色综合| 亚洲精品久久久久久久久| 在线观看麻豆蜜桃| 97在线视频免费| 91精品国产自产观看在线| 久久久久久国产精品mv| 亚洲电影在线一区二区三区| 免费av网址在线| 成人深夜福利app| 911国产在线| 色婷婷av一区二区三区软件| 亚洲av无码乱码国产麻豆| 一区二区欧美久久| 日本不卡1234视频| 国产经品一区二区| 久久久人成影片免费观看| 欧美 日韩精品| 成人激情免费网站| 国产97免费视频| 欧美日韩黄视频| 青青操在线视频| 国内伊人久久久久久网站视频| www一区二区三区| 亚洲高清精品中出| 久久中文精品| av在线网站观看| 午夜精品久久久久久不卡8050| 99国产精品99| 久久视频在线播放| 日韩精品第二页| 亚洲精品在线视频观看| 日韩**一区毛片| 成人午夜福利一区二区| 欧美色videos| 欧美偷拍视频| 5252色成人免费视频| 久久激情av| 亚洲美免无码中文字幕在线| 福利一区二区在线观看| 国产suv精品一区二区68| 欧美日韩亚洲综合在线| 91大神在线网站| 国产精品午夜视频| 成人在线亚洲| 五月婷婷之婷婷| 中文字幕亚洲在| 97人妻人人澡人人爽人人精品| 色99之美女主播在线视频| 欧美91在线|欧美| 亚洲在线视频一区二区| 狠狠色丁香久久婷婷综| 精品国产视频在线观看| 91精品国产综合久久婷婷香蕉| 国产丝袜在线| 99九九视频| 亚洲伦理一区| 国产交换配乱淫视频免费| 一本到高清视频免费精品| 免费在线观看污视频| 国产成人精品优优av| 成人写真视频| 性久久久久久久久久久久久久| 亚洲最新视频在线播放| 天天操天天舔天天干| 国产91亚洲精品| 97色伦图片97综合影院| 99精品视频国产| 午夜亚洲福利老司机| 久久经典视频| 国产日韩欧美另类| 欧美午夜影院| 久久国产精品无码一级毛片 | 久久久av亚洲男天堂| 99亚洲乱人伦aⅴ精品| 欧美牲交a欧美牲交aⅴ免费真| 亚洲国产精品99久久久久久久久| 国产人妻精品一区二区三| 欧美激情在线观看视频| 欧美极品中文字幕| а 天堂 在线| 天天爽夜夜爽夜夜爽精品视频| 国产精品99999| 97在线中文字幕| 免费在线成人| 亚洲欧美精品aaaaaa片| 日韩精品免费在线视频| 色综合视频一区二区三区44| 日韩 欧美 视频| 国产日产欧产精品推荐色| 国产哺乳奶水91在线播放| 秋霞av国产精品一区| 亚洲乱码在线| 成年人网站免费看| 欧美一区二区三区在线观看| 蜜桃视频动漫在线播放| 一区二区三区欧美成人| 91美女在线观看| 精品国产av一区二区| 日韩av男人的天堂| 国产精品激情电影| www色com| 亚洲精品av在线| 国产美女精品视频免费播放软件 | 国产呦精品一区二区三区网站| 国产精品午夜影院| 美女精品视频一区| 青青草成人影院| 亚洲精品视频大全|