精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink實時計算topN熱榜

開發 架構
為了統計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這里根據UserViewCount中的windowEnd進行keyBy()操作。

[[386499]]

本文轉載自微信公眾號「Java大數據與數據倉庫」,作者柯廣。轉載本文請聯系Java大數據與數據倉庫公眾號。

topN的常見應用場景,最熱商品購買量,最高人氣作者的閱讀量等等。

1. 用到的知識點

  • Flink創建kafka數據源;
  • 基于 EventTime 處理,如何指定 Watermark;
  • Flink中的Window,滾動(tumbling)窗口與滑動(sliding)窗口;
  • State狀態的使用;
  • ProcessFunction 實現 TopN 功能;

2. 案例介紹

通過用戶訪問日志,計算最近一段時間平臺最活躍的幾位用戶topN。

  • 創建kafka生產者,發送測試數據到kafka;
  • 消費kafka數據,使用滑動(sliding)窗口,每隔一段時間更新一次排名;

3. 數據源

這里使用kafka api發送測試數據到kafka,代碼如下:

  1. @Data 
  2. @NoArgsConstructor 
  3. @AllArgsConstructor 
  4. @ToString 
  5. public class User { 
  6.  
  7.     private long id; 
  8.     private String username; 
  9.     private String password
  10.     private long timestamp
  11.  
  12. Map<String, String> config = Configuration.initConfig("commons.xml"); 
  13.  
  14. @Test 
  15. public void sendData() throws InterruptedException { 
  16.     int cnt = 0; 
  17.  
  18.     while (cnt < 200){ 
  19.         User user = new User(); 
  20.         user.setId(cnt); 
  21.         user.setUsername("username" + new Random().nextInt((cnt % 5) + 2)); 
  22.         user.setPassword("password" + cnt); 
  23.         user.setTimestamp(System.currentTimeMillis()); 
  24.         Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user)); 
  25.         while (!future.isDone()){ 
  26.             Thread.sleep(100); 
  27.         } 
  28.         try { 
  29.             RecordMetadata recordMetadata = future.get(); 
  30.             System.out.println(recordMetadata.offset()); 
  31.         } catch (InterruptedException e) { 
  32.             e.printStackTrace(); 
  33.         } catch (ExecutionException e) { 
  34.             e.printStackTrace(); 
  35.         } 
  36.         System.out.println("發送消息:" + cnt + "******" + user.toString()); 
  37.         cnt = cnt + 1; 
  38.     } 

這里通過隨機數來擾亂username,便于使用戶名大小不一,讓結果更加明顯。KafkaUtil是自己寫的一個kafka工具類,代碼很簡單,主要是平時做測試方便。

4. 主要程序

創建一個main程序,開始編寫代碼。

創建flink環境,關聯kafka數據源。

  1. Map<String, String> config = Configuration.initConfig("commons.xml"); 
  2.  
  3. Properties kafkaProps = new Properties(); 
  4. kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper")); 
  5. kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport")); 
  6. kafkaProps.setProperty("group.id", config.get("kafka-groupid")); 
  7.  
  8. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); 

EventTime 與 Watermark

  1. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

設置屬性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照數據時間字段來處理,默認是TimeCharacteristic.ProcessingTime

  1. /** The time characteristic that is used if none other is set. */ 
  2. private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; 

這個屬性必須設置,否則后面,可能窗口結束無法觸發,導致結果無法輸出。取值有三種:

  • ProcessingTime:事件被處理的時間。也就是由flink集群機器的系統時間來決定。
  • EventTime:事件發生的時間。一般就是數據本身攜帶的時間。
  • IngestionTime:攝入時間,數據進入flink流的時間,跟ProcessingTime還是有區別的;

指定好使用數據的實際時間來處理,接下來需要指定flink程序如何get到數據的時間字段,這里使用調用DataStream的assignTimestampsAndWatermarks方法,抽取時間和設置watermark。

  1. senv.addSource( 
  2.         new FlinkKafkaConsumer010<>( 
  3.                 config.get("kafka-topic"), 
  4.                 new SimpleStringSchema(), 
  5.                 kafkaProps 
  6.         ) 
  7. ).map(x ->{ 
  8.     return JSON.parseObject(x, User.class); 
  9. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) { 
  10.     @Override 
  11.     public long extractTimestamp(User element) { 
  12.         return element.getTimestamp(); 
  13.     } 
  14. }) 

前面給出的代碼中可以看出,由于發送到kafka的時候,將User對象轉換為json字符串了,這里使用的是fastjson,接收過來可以轉化為JsonObject來處理,我這里還是將其轉化為User對象JSON.parseObject(x, User.class),便于處理。

這里考慮到數據可能亂序,使用了可以處理亂序的抽象類BoundedOutOfOrdernessTimestampExtractor,并且實現了唯一的一個沒有實現的方法extractTimestamp,亂序數據,會導致數據延遲,在構造方法中傳入了一個Time.milliseconds(1000),表明數據可以延遲一秒鐘。比如說,如果窗口長度是10s,0~10s的數據會在11s的時候計算,此時watermark是10,才會觸發計算,也就是說引入watermark處理亂序數據,最多可以容忍0~t這個窗口的數據,最晚在t+1時刻到來。

 

具體關于watermark的講解可以參考這篇文章

https://blog.csdn.net/qq_39657909/article/details/106081543

窗口統計

業務需求上,通常可能是一個小時,或者過去15分鐘的數據,5分鐘更新一次排名,這里為了演示效果,窗口長度取10s,每次滑動(slide)5s,即5秒鐘更新一次過去10s的排名數據。

  1. .keyBy("username"
  2. .timeWindow(Time.seconds(10), Time.seconds(5)) 
  3. .aggregate(new CountAgg(), new WindowResultFunction()) 

我們使用.keyBy("username")對用戶進行分組,使用.timeWindow(Time size, Time slide)對每個用戶做滑動窗口(10s窗口,5s滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少 state 的存儲壓力。較之.apply(WindowFunction wf)會將窗口中的數據都存儲下來,最后一起計算要高效地多。aggregate()方法的第一個參數用于

這里的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。

  1. public class CountAgg implements AggregateFunction<User, Long, Long>{ 
  2.     @Override 
  3.     public Long createAccumulator() { 
  4.         return 0L; 
  5.     } 
  6.  
  7.     @Override 
  8.     public Long add(User value, Long accumulator) { 
  9.         return accumulator + 1; 
  10.     } 
  11.  
  12.     @Override 
  13.     public Long getResult(Long accumulator) { 
  14.         return accumulator; 
  15.     } 
  16.  
  17.     @Override 
  18.     public Long merge(Long a, Long b) { 
  19.         return a + b; 
  20.     } 

.aggregate(AggregateFunction af, WindowFunction wf) 的第二個參數WindowFunction將每個 key每個窗口聚合后的結果帶上其他信息進行輸出。我們這里實現的WindowResultFunction將用戶名,窗口,訪問量封裝成了UserViewCount進行輸出。

  1. private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> { 
  2.  
  3.  
  4.     @Override 
  5.     public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception { 
  6.         Long count = input.iterator().next(); 
  7.         out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count)); 
  8.     } 
  9.  
  10. @Data 
  11. @NoArgsConstructor 
  12. @AllArgsConstructor 
  13. @ToString 
  14. public static class UserViewCount { 
  15.     private String userName; 
  16.     private long windowEnd; 
  17.     private long viewCount; 
  18.  

TopN計算最活躍用戶

為了統計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這里根據UserViewCount中的windowEnd進行keyBy()操作。然后使用 ProcessFunction 實現一個自定義的 TopN 函數 TopNHotItems 來計算點擊量排名前3名的用戶,并將排名結果格式化成字符串,便于后續輸出。

  1. .keyBy("windowEnd"
  2. .process(new TopNHotUsers(3)) 
  3. .print(); 

ProcessFunction 是 Flink 提供的一個 low-level API,用于實現更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有用戶的訪問數據。由于 Watermark 的進度是全局的,在 processElement 方法中,每當收到一條數據(ItemViewCount),我們就注冊一個 windowEnd+1 的定時器(Flink 框架會自動忽略同一時間的重復注冊)。windowEnd+1 的定時器被觸發時,意味著收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有用戶窗口統計值。我們在 onTimer() 中處理將收集的所有商品及點擊量進行排序,選出 TopN,并將排名信息格式化成字符串后進行輸出。

這里我們還使用了 ListState 來存儲收到的每條 UserViewCount 消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它集成了框架的 checkpoint 機制,自動做到了 exactly-once 的語義保證。

  1. private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> { 
  2.  
  3.     private int topSize; 
  4.     private ListState<UserViewCount> userViewCountListState; 
  5.  
  6.     public TopNHotUsers(int topSize) { 
  7.         this.topSize = topSize; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { 
  12.         super.onTimer(timestamp, ctx, out); 
  13.         List<UserViewCount> userViewCounts = new ArrayList<>(); 
  14.         for(UserViewCount userViewCount : userViewCountListState.get()) { 
  15.             userViewCounts.add(userViewCount); 
  16.         } 
  17.  
  18.         userViewCountListState.clear(); 
  19.  
  20.         userViewCounts.sort(new Comparator<UserViewCount>() { 
  21.             @Override 
  22.             public int compare(UserViewCount o1, UserViewCount o2) { 
  23.                 return (int)(o2.viewCount - o1.viewCount); 
  24.             } 
  25.         }); 
  26.  
  27.         // 將排名信息格式化成 String, 便于打印 
  28.         StringBuilder result = new StringBuilder(); 
  29.         result.append("====================================\n"); 
  30.         result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n"); 
  31.         for (int i = 0; i < topSize; i++) { 
  32.             UserViewCount currentItem = userViewCounts.get(i); 
  33.             // No1:  商品ID=12224  瀏覽量=2413 
  34.             result.append("No").append(i).append(":"
  35.                     .append("  用戶名=").append(currentItem.userName) 
  36.                     .append("  瀏覽量=").append(currentItem.viewCount) 
  37.                     .append("\n"); 
  38.         } 
  39.         result.append("====================================\n\n"); 
  40.  
  41.         Thread.sleep(1000); 
  42.  
  43.         out.collect(result.toString()); 
  44.  
  45.     } 
  46.  
  47.     @Override 
  48.     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { 
  49.         super.open(parameters); 
  50.         ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>( 
  51.                 "user-state"
  52.                 UserViewCount.class 
  53.         ); 
  54.         userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor); 
  55.  
  56.     } 
  57.  
  58.     @Override 
  59.     public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception { 
  60.         userViewCountListState.add(value); 
  61.         ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000); 
  62.     } 

結果輸出

可以看到,每隔5秒鐘更新輸出一次數據。

 

參考

 

http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

 

責任編輯:武曉燕 來源: Java大數據與數據倉庫
相關推薦

2021-06-06 13:10:12

FlinkPvUv

2021-07-16 10:55:45

數倉一體Flink SQL

2022-12-29 09:13:02

實時計算平臺

2015-07-31 10:35:18

實時計算

2019-06-27 09:12:43

FlinkStorm框架

2015-08-31 14:27:52

2016-12-28 14:27:24

大數據Apache Flin搜索引擎

2017-09-26 09:35:22

2015-10-09 13:42:26

hbase實時計算

2019-11-21 09:49:29

架構運維技術

2021-06-03 08:10:30

SparkStream項目Uv

2019-02-18 15:23:21

馬蜂窩MESLambda

2021-07-05 10:48:42

大數據實時計算

2016-11-02 09:02:56

交通大數據計算

2021-03-10 14:04:10

大數據計算技術

2011-10-28 09:05:09

2017-01-15 13:45:20

Docker大數據京東

2020-09-10 17:41:14

ClickHouse數據引擎

2022-08-24 09:19:03

美團計算

2022-11-10 08:48:20

開源數據湖Arctic
點贊
收藏

51CTO技術棧公眾號

欧美一级免费大片| 国产强被迫伦姧在线观看无码| 日本午夜在线| 久久在线观看| 看电视剧不卡顿的网站| 亚洲精品wwwww| 日本中文字幕一级片| 国产在线观看第一页| 久久亚州av| 一个色妞综合视频在线观看| 国产精品永久在线| 在线免费看黄视频| 午夜不卡影院| 91网址在线看| 91精品91久久久久久| 制服.丝袜.亚洲.中文.综合懂| se在线电影| 丝袜美腿一区二区三区| 精品亚洲夜色av98在线观看 | 91网址在线观看精品| 韩国免费在线视频| 久久一区二区三区超碰国产精品| 日韩在线激情视频| 高潮一区二区三区| 免费在线看黄| 国产精品资源网| 欧美激情视频一区二区| 日本少妇xxxx软件| 国产极品一区| 最新日韩av在线| 91在线视频九色| 亚洲乱码国产乱码精品精大量| 白浆视频在线观看| 91美女视频网站| 99久久久精品免费观看国产| 九九热精品在线观看| 年轻的保姆91精品| 欧美三级韩国三级日本三斤| 色噜噜狠狠一区二区三区| 成人黄色激情视频| 99日韩精品| 亚洲一二在线观看| 一级做a免费视频| 日本高清不卡一区二区三区视频| 久久久久久久久免费| 国产成人在线一区| 黄色香蕉视频在线观看| 日韩成人在线观看视频| 亚洲成人7777| 日本一区视频在线观看| 91av国产精品| 欧美日韩三区| 国产偷亚洲偷欧美偷精品| 涩视频在线观看| 韩国成人漫画| 亚洲色欲色欲www在线观看| 99久久综合狠狠综合久久止| 91 中文字幕| 精品在线免费观看| 97国产suv精品一区二区62| 熟女俱乐部一区二区视频在线| 精品精品国产毛片在线看 | 欧美天堂在线视频| 午夜一区在线| 久久精品国产久精国产一老狼| 久久久久中文字幕亚洲精品| 欧美日韩国产一区二区在线观看| 6080亚洲精品一区二区| 人体内射精一区二区三区| 国产在线视频网址| 国产日韩欧美激情| 国产chinese精品一区二区| 日韩综合在线观看| 欧美精品导航| 久久久久国产视频| 天天干天天操天天拍| 成人在线超碰| 在线播放欧美女士性生活| 中文字幕资源在线观看| 欧美激情喷水| 亚洲大片精品永久免费| 日韩欧美一区三区| www在线观看播放免费视频日本| 97国产一区二区| 91九色单男在线观看| 国产老妇伦国产熟女老妇视频| 亚洲中字在线| 国产精品露脸av在线| 亚洲欧美在线观看视频| 欧美在线亚洲综合一区| 伊是香蕉大人久久| 内射中出日韩无国产剧情| 中文字幕久久精品一区二区| 欧美精品777| 91热这里只有精品| xx欧美xxx| 欧美精品在线观看播放| 国产一卡二卡三卡四卡| 欧美大片91| 日韩成人在线视频网站| 四虎永久免费观看| 欧美欧美黄在线二区| 亚洲精品国产美女| 精品一区二区三孕妇视频| 综合国产视频| 亚洲欧美日韩精品久久奇米色影视| 亚洲欧洲日韩综合| 日韩精品免费一区二区夜夜嗨 | 69亚洲乱人伦| 成人午夜av| 国产一区二区三区在线| 麻豆疯狂做受xxxx高潮视频| 日韩专区在线视频| 欧美自拍视频在线| 天堂а√在线中文在线新版| 中文高清一区| 欧美夫妻性生活视频| 欧美h在线观看| 国产成人免费网站| 成人欧美一区二区| 69久久久久| 国产精品美女久久久久久2018| 婷婷久久伊人| wwwww亚洲| 一区二区三区四区精品在线视频| 日本女人高潮视频| av香蕉成人| 欧美性欧美巨大黑白大战| 黄色在线视频网| 白嫩亚洲一区二区三区| 91精品福利在线一区二区三区 | 久久综合一区二区| 精品日本一区二区三区在线观看| 日批视频在线播放| 91尤物视频在线观看| 日本女人高潮视频| 欧美美女被草| 91精品国产综合久久福利| 亚洲国产无码精品| 日韩一级不卡| 国产精品久久久久久久久久久久午夜片| 黄色av免费观看| 97精品国产露脸对白| 久久天天东北熟女毛茸茸| 欧美黑人猛交| 在线观看国产一区二区| 中文字幕日韩综合| 成人vr资源| 国产精品com| 国产福利电影在线| 一区二区在线观看免费 | 欧美 日韩 精品| 91美女视频网站| 日本a级片免费观看| 欧美中文字幕精在线不卡| 日韩精品一二三四区| 日韩精品成人一区| 久草在线在线精品观看| 亚洲精品久久久久久一区二区| av在线免费观看网址| 欧美一区二区三区免费在线看 | 天天综合视频在线观看| 一区二区三区在线免费观看| 91aaa精品| 欧美私人啪啪vps| 国产福利久久精品| 女人让男人操自己视频在线观看 | 天天综合网在线观看| 亚洲成a人片综合在线| 黄色片视频在线免费观看| 成人精品动漫一区二区三区| 久久噜噜噜精品国产亚洲综合| 无码人妻丰满熟妇区五十路| 久久久久久一级片| 孩娇小videos精品| 一区三区在线欧| 国产精品精品视频| 天天爱天天干天天操| 婷婷开心激情综合| 原创真实夫妻啪啪av| 欧美理论视频| 欧美整片在线观看| 大片免费播放在线视频| 91.麻豆视频| 日韩精品国产一区二区| 欧美激情一区在线观看| 波多野结衣家庭教师在线播放| 欧美福利在线播放网址导航| 操日韩av在线电影| 波多野结衣在线观看一区| 99热精品国产| 福利视频一二区| 国产精品一在线观看| 2018国产精品视频| h视频在线播放| 精品国产免费久久| 国产性生活网站| 国产一区二区剧情av在线| 亚洲国产欧美日韩| 91综合久久爱com| 国产精品久久久久久久久久尿| 国产人成网在线播放va免费| 欧美日韩国产三级| 亚洲aaa视频| 麻豆精品视频在线观看视频| 97超碰国产精品| 美女久久精品| 秋霞成人午夜鲁丝一区二区三区| 麻豆av在线导航| 日韩高清av一区二区三区| 一二三区在线播放| 国产精品欧美久久久久一区二区| 老司机av网站| 免费成人在线观看| 亚洲一区二区自拍偷拍| 日本h片久久| 色妞欧美日韩在线| 午夜影院免费视频| 欧美日韩国产中字| 能免费看av的网站| 国产成人在线网站| 日本xxxx黄色| 免费在线播放第一区高清av| 成年女人18级毛片毛片免费| 高清欧美性猛交xxxx黑人猛| 欧美激情伊人电影| 日本视频在线观看| 国产视频久久久久| 韩国av免费在线| 欧美一二三区精品| 亚洲天堂中文字幕在线| 91久久久免费一区二区| 999福利视频| 成人综合婷婷国产精品久久蜜臀| 精品少妇一区二区三区在线| 亚洲91久久| 国产精品入口免费| 九色精品蝌蚪| 91久久久久久久一区二区| 懂色aⅴ精品一区二区三区| 国产91色在线|免| 日本三级在线视频| 中文字幕日韩av| 精品国产亚洲一区二区麻豆| 亚洲成av人片在www色猫咪| 青青草原免费观看| 又紧又大又爽精品一区二区| 日韩a级片在线观看| av成人老司机| 无码精品一区二区三区在线播放| 三级影片在线观看欧美日韩一区二区| 99热在线这里只有精品| 亚洲一区二区三区四区五区午夜 | 理论不卡电影大全神| 久久免费成人精品视频| 免费一二一二在线视频| 日韩美女福利视频| 成人福利一区二区| 国产日韩欧美视频在线| 波多野结衣在线高清| 久久人人爽人人爽人人片av高请 | 手机在线免费看av| 亚洲美女视频网站| 黄视频在线观看免费| 国产一区二区成人| 午夜看片在线免费| 精品自拍视频在线观看| 波多野结衣一区二区| 在线播放国产精品| 韩国av网站在线| 欧美激情影音先锋| 亚洲天堂av影院| 国产精品99一区| 美国十次综合久久| 久久久一本精品99久久精品| 国产精品视频首页| 欧美综合第一页| 成人精品国产| 91青青草免费观看| 精品美女一区| 91黄在线观看| 香蕉久久精品| 中文字幕日韩精品一区二区| 亚洲精华一区二区三区| 日韩偷拍一区二区| 午夜欧美精品| 国内外免费激情视频| 在线亚洲精品| 国内外成人免费在线视频| 粉嫩久久99精品久久久久久夜| 在线免费看污网站| 成人午夜在线视频| 手机看片日韩av| 一区二区三区四区av| 天天爱天天做天天爽| 日韩精品一区二区三区在线播放 | 中文字幕欧美在线| 91美女精品| 国产日韩中文字幕| 秋霞影视一区二区三区| 在线观看视频黄色| 国产韩日影视精品| 一区二区三区在线视频看| 亚洲手机视频| 成人免费观看cn| 亚洲三级影院| 国产午夜大地久久| 久久国产成人午夜av影院| 国产草草浮力影院| 91年精品国产| 日韩在线中文字幕视频| 色欧美88888久久久久久影院| 国产极品久久久| 精品日韩欧美一区二区| 亚洲国产999| 亚洲国内精品视频| 成人国产免费电影| 国产精品xxx视频| 欧美xxxx在线| 真实国产乱子伦对白视频| 美女精品一区二区| 国产aⅴ激情无码久久久无码| 国产亚洲福利社区一区| 精品午夜福利视频| 777精品伊人久久久久大香线蕉| 青青草免费在线视频| 午夜精品久久17c| 麻豆精品国产| 蜜臀av.com| 经典三级在线一区| 中文字幕人妻熟女在线| 国产精品国模大尺度视频| 欧美偷拍第一页| 欧美日韩亚洲综合在线| 黄色在线小视频| 青青草原一区二区| 天天做夜夜做人人爱精品 | 乱h高h女3p含苞待放| 欧美日韩一区二区在线视频| 国产一区二区影视| 日本国产欧美一区二区三区| 全球av集中精品导航福利| 国产日本在线播放| 国产馆精品极品| 美女的奶胸大爽爽大片| 婷婷一区二区三区| 色香蕉在线视频| 69**夜色精品国产69乱| 美女一区2区| 日本不卡在线观看视频| 久久这里只精品最新地址| 亚洲永久精品在线观看| 亚洲美女精品成人在线视频| 自拍一区在线观看| 欧美三级网色| 欧美精品一线| 少妇伦子伦精品无吗| 亚洲国产日韩一区二区| 日本激情一区二区三区| 欧美亚洲成人精品| 美国十次综合久久| 久操网在线观看| 久久久久亚洲蜜桃| 亚洲专区第一页| 久久婷婷国产麻豆91天堂| 久久免费福利| 欧美深夜福利视频| 国产亚洲精品久| 国产老妇伦国产熟女老妇视频| 欧美大片在线看免费观看| 欧美1区二区| 特级丰满少妇一级| 悠悠色在线精品| 亚洲色欧美另类| 国产精品欧美一区二区三区奶水| 91精品国产调教在线观看| 国产麻豆剧传媒精品国产| 欧美日韩久久久久| 亚洲s色大片| 国产精品久久久一区二区三区| 亚洲影音先锋| 精品视频第一页| 亚洲国产精品va在看黑人| a级影片在线| 久久免费视频1| 久久精品久久综合| 国产一级一级片| 日韩欧美一二三| 亚洲女同av| av动漫在线播放| 久久久91精品国产一区二区精品 | 欧美精品一区二区三区国产精品 | 91国偷自产一区二区三区观看| 蜜桃视频在线观看免费视频网站www| av一区二区三区四区电影| 美女国产精品| 中文精品在线观看| 欧美日韩国产免费| 性爽视频在线| 国产制服91一区二区三区制服|