精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

一篇文章帶你深入理解FlinkSQL中的窗口

大數據
時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。

[[360693]]

前言

時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over Windows

 

一、分組窗口(Group Windows) 分組窗口(Group Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group)中,并對每個組的數據執行一次聚合函數。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在 group by 子句中,像常規的分組字段一樣引用。例子:

  1. val table = input 
  2. .window([w: GroupWindow] as 'w) 
  3. .groupBy('w, 'a) 
  4. .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count

Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層DataStream 或 DataSet 的窗口操作。

Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding和 會話(Session)。

1.1 滾動窗口

滾動窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:

  • over:定義窗口長度
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求

設置滾動窗口為10秒鐘統計id出現的次數。

2.數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.代碼實現

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.scala._ 
  8. import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} 
  9. import org.apache.flink.types.Row 
  10.  
  11. /** 
  12.  * @Package Windows 
  13.  * @File :FlinkSQLTumBlingTie.java 
  14.  * @author 大數據老哥 
  15.  * @date 2020/12/25 21:58 
  16.  * @version V1.0 
  17.  *          設置滾動窗口 
  18.  */ 
  19. object FlinkSQLTumBlingTie { 
  20.   def main(args: Array[String]): Unit = { 
  21.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  22.     env.setParallelism(1) 
  23.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  24.  
  25.     val settings = EnvironmentSettings.newInstance() 
  26.       .useBlinkPlanner() 
  27.       .inStreamingMode() 
  28.       .build() 
  29.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  30.  
  31.     // 讀取數據 
  32.     val inputPath = "./data/sensor.txt" 
  33.     val inputStream = env.readTextFile(inputPath) 
  34.     
  35.  
  36.     // 先轉換成樣例類類型(簡單轉換操作) 
  37.     val dataStream = inputStream 
  38.       .map(data => { 
  39.         val arr = data.split(","
  40.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  41.       }) 
  42.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  43.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  44.       }) 
  45.  
  46.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  47.     // 注冊表 
  48.     tableEnv.createTemporaryView("sensor", sensorTable) 
  49.     // table 實現 
  50.     val resultTable = sensorTable 
  51.       .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統計一次,滾動時間窗口 
  52.       .groupBy('id, 'tw) 
  53.       .select('id, 'id.count, 'tw.end
  54.     //sql 實現 
  55.     val sqlTable = tableEnv.sqlQuery( 
  56.       ""
  57.         |select 
  58.         |id, 
  59.         |count(id) , 
  60.         |tumble_end(ts,interval '10' second
  61.         |from sensor 
  62.         |group by 
  63.         |id, 
  64.         |tumble(ts,interval '10' second
  65.         |""".stripMargin) 
  66.  
  67.     /*** 
  68.      * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時間字段 rowtime) 
  69.      * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時間字段 proctime) 
  70.      * .window(Tumble over 10.minutes on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組) 
  71.      */ 
  72.     resultTable.toAppendStream[Row].print("talbe"
  73.     sqlTable.toRetractStream[Row].print("sqlTable"
  74.      
  75.     env.execute("FlinkSQLTumBlingTie"
  76.   } 
  77.  
  78.   case class SensorReading(id: String, timestamp: Long, temperature: Double
  79.  

運行結果

 

1.2 滑動窗口

滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:

  • over:定義窗口長度
  • every:定義滑動步長
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求描述

設置窗口大小為10秒鐘設置滑動距離為5秒鐘,統計id的出現的次數。

2.數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.實現代碼

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10. import windows.FlinkSQLTumBlingTie.SensorReading 
  11.  
  12. /** 
  13.  * @Package windows 
  14.  * @File :FlinkSQLSlideTime.java 
  15.  * @author 大數據老哥 
  16.  * @date 2020/12/27 22:19 
  17.  * @version V1.0 
  18.  *          滑動窗口 
  19.  */ 
  20. object FlinkSQLSlideTime { 
  21.   def main(args: Array[String]): Unit = { 
  22.     //構建運行環境 
  23.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  24.     env.setParallelism(1) // 設置分區為1 方便后面測試 
  25.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間 
  26.  
  27.     val settings = EnvironmentSettings.newInstance() 
  28.       .useBlinkPlanner() 
  29.       .inStreamingMode() 
  30.       .build() 
  31.     // 創建表env 
  32.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  33.  
  34.     // 讀取數據 
  35.     val inputPath = "./data/sensor.txt" 
  36.     val inputStream = env.readTextFile(inputPath) 
  37.  
  38.     // 先轉換成樣例類類型(簡單轉換操作) 
  39.     val dataStream = inputStream 
  40.       .map(data => { 
  41.         val arr = data.split(","
  42.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  43.       }) 
  44.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  45.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  46.       }) 
  47.  
  48.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  49.     // 注冊表 
  50.     tableEnv.createTemporaryView("sensor", sensorTable) 
  51.     // table API 實現 
  52.     val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w) 
  53.       .groupBy('w, 'id) 
  54.       .select('id, 'id.count, 'w.end
  55.     val tableSql = tableEnv.sqlQuery( 
  56.       ""
  57.         |select 
  58.         |id, 
  59.         |count(id), 
  60.         |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w 
  61.         |from sensor 
  62.         |group by 
  63.         |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id 
  64.         |""".stripMargin) 
  65.  
  66.     tableApi.toAppendStream[Row].print("tableApi"
  67.     tableSql.toAppendStream[Row].print("tableSql"
  68.     /** 
  69. .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時間字段 rowtime) 
  70. .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時間字段 proctime)  
  71. .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組) 
  72.    **/ 
  73.     env.execute("FlinkSQLSlideTime"
  74.   } 

4.運行結果

 

1.3 會話窗口

會話窗口(Session windows)要用 Session 類來定義,另外還有三個方法:

  • withGap:會話時間間隔
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中實現案例

1.需求描述

設置一個session 為10秒鐘 統計id的個數

2.準備數據

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.編寫代碼

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Session, Table
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10. import windows.FlinkSQLTumBlingTie.SensorReading 
  11.  
  12. /** 
  13.  * @Package windows 
  14.  * @File :FlinkSqlSessionTime.java 
  15.  * @author 大數據老哥 
  16.  * @date 2020/12/27 22:52 
  17.  * @version V1.0 
  18.  */ 
  19. object FlinkSqlSessionTime { 
  20.   def main(args: Array[String]): Unit = { 
  21.     //構建運行環境 
  22.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  23.     env.setParallelism(1) // 設置分區為1 方便后面測試 
  24.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間 
  25.  
  26.     val settings = EnvironmentSettings.newInstance() 
  27.       .useBlinkPlanner() 
  28.       .inStreamingMode() 
  29.       .build() 
  30.     // 創建表env 
  31.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  32.  
  33.     // 讀取數據 
  34.     val inputPath = "./data/sensor.txt" 
  35.     val inputStream = env.readTextFile(inputPath) 
  36.  
  37.     // 先轉換成樣例類類型(簡單轉換操作) 
  38.     val dataStream = inputStream 
  39.       .map(data => { 
  40.         val arr = data.split(","
  41.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  42.       }) 
  43.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  44.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  45.       }) 
  46.  
  47.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  48.     // 注冊表 
  49.     tableEnv.createTemporaryView("sensor", sensorTable) 
  50.  
  51.     val tableApi = sensorTable. 
  52.       window(Session withGap 10.seconds on 'ts as 'w) 
  53.       .groupBy('id, 'w) 
  54.       .select('id, 'id.count, 'w.end
  55.     val tableSQL = tableEnv.sqlQuery( 
  56.       ""
  57.         |SELECT 
  58.         |id, 
  59.         |COUNT(id), 
  60.         |SESSION_END(ts, INTERVAL '10' SECONDAS w 
  61.         |FROM sensor 
  62.         |GROUP BY 
  63.         |id, 
  64.         |SESSION(ts, INTERVAL '10' SECOND
  65.         |""".stripMargin) 
  66.     tableApi.toAppendStream[Row].print("tableApi"
  67.     tableSQL.toAppendStream[Row].print("tableSQL"
  68.  
  69.     /** 
  70.      * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時間字段 rowtime) 
  71.      * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時間字段 proctime) 
  72.      */ 
  73.     env.execute("FlinkSqlSessionTime"
  74.   } 

4.運行結果

 

二、 Over Windows

Over window 聚合是標準 SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows使用.window(w:overwindows*)子句定義,并在 select()方法中通過別名來引用。例子:

  1. val table = input 
  2. .window([w: OverWindow] as 'w) 
  3. .select('a, 'b.sum over 'w, 'c.min over 'w) 

Table API 提供了 Over 類,來配置 Over 窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義 Over windows。

無界的 over window 是使用常量指定的。也就是說,時間間隔要指定 UNBOUNDED_RANGE,或者行計數間隔要指定 UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。

2.1 無界的 over window

  1. // 無界的事件時間 over window (時間字段 "rowtime"
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) 
  3. //無界的處理時間 over window (時間字段"proctime"
  4. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) 
  5. // 無界的事件時間 Row-count over window (時間字段 "rowtime"
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) 
  7. //無界的處理時間 Row-count over window (時間字段 "rowtime"
  8. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w) 

2.2 有界的 over window

  1. // 有界的事件時間 over window (時間字段 "rowtime",之前 1 分鐘) 
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) 
  3. // 有界的處理時間 over window (時間字段 "rowtime",之前 1 分鐘) 
  4. .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) 
  5. // 有界的事件時間 Row-count over window (時間字段 "rowtime",之前 10 行) 
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) 
  7. // 有界的處理時間 Row-count over window (時間字段 "rowtime",之前 10 行) 
  8. .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w) 

2.3 代碼練習

我們可以綜合學習過的內容,用一段完整的代碼實現一個具體的需求。例如,統計每個sensor每條數據,與之前兩行數據的平均溫度。

數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼分析:

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble} 
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10.  
  11. /** 
  12. * @Package windows 
  13. * @File :FlinkSqlTumBlingOverTime.java 
  14. * @author 大數據老哥 
  15. * @date 2020/12/28 21:45 
  16. * @version V1.0 
  17. */ 
  18. object FlinkSqlTumBlingOverTime { 
  19.  def main(args: Array[String]): Unit = { 
  20.    // 構建運行環境 
  21.    val env = StreamExecutionEnvironment.getExecutionEnvironment 
  22.    env.setParallelism(1) // 設置并行度為1方便后面進行測試 
  23.    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置事件時間 
  24.  
  25.    val settings = EnvironmentSettings.newInstance() 
  26.      .useBlinkPlanner() 
  27.      .inStreamingMode() 
  28.      .build() 
  29.    //構建table Env 
  30.    val tableEnv = StreamTableEnvironment.create(env, settings) 
  31.  
  32.    // 讀取數據 
  33.    val inputPath = "./data/sensor.txt" 
  34.    val inputStream = env.readTextFile(inputPath) 
  35.    // 先轉換成樣例類類型(簡單轉換操作) 
  36.    // 解析數據 封裝成樣例類 
  37.    val dataStream = inputStream 
  38.      .map(data => { 
  39.        val arr = data.split(","
  40.        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  41.      }) 
  42.      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  43.        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  44.      }) 
  45.    // 將數據注冊成一張臨時表 
  46.    val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts) 
  47.    tableEnv.createTemporaryView("sensor",dataTable) 
  48.    var tableRes= dataTable.window( Over partitionBy 'id orderBy  'ts preceding 2.rows as 'ow) 
  49.     .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow) 
  50.  
  51.   var tableSql= tableEnv.sqlQuery( 
  52.      ""
  53.        |select 
  54.        |id, 
  55.        |ts, 
  56.        |count(id) over ow, 
  57.        |avg(temperature) over ow 
  58.        |from sensor 
  59.        |window ow as
  60.        | partition by id 
  61.        | order by ts 
  62.        | rows between 2 preceding and current row 
  63.        |) 
  64.        |""".stripMargin) 
  65.  
  66.    tableRes.toAppendStream[Row].print("tableRes"
  67.    tableSql.toAppendStream[Row].print("tableSql"
  68.    env.execute("FlinkSqlTumBlingOverTime"
  69.  } 
  70.  case class SensorReading(id: String, timestamp: Long, temperature: Double
  71.  

運行結果

 

 

總結

好了到這里FlinkSql中窗口使用到這里就結束啦,喜歡的可以給了三連。其中FlinkSql中的窗口的用法還是比較多得,所有還是要多加練習。老話說的好,師傅領進門,修行在個人。

本文轉載自微信公眾號「 大數據老哥」,可以通過以下二維碼關注。轉載本文請聯系 大數據老哥公眾號。

 

責任編輯:武曉燕 來源: 大數據老哥
相關推薦

2020-11-27 08:02:41

Promise

2025-01-09 11:26:47

2021-10-15 07:57:04

Docker 日志容器

2018-11-21 08:00:05

Dubbo分布式系統

2022-02-21 09:44:45

Git開源分布式

2023-05-12 08:19:12

Netty程序框架

2021-06-30 00:20:12

Hangfire.NET平臺

2021-08-12 14:19:14

Slice數組類型內存

2021-05-18 09:00:28

Pythonclass

2020-09-29 15:13:14

C++語言開發

2021-05-15 09:18:04

Python進程

2021-07-01 10:01:16

JavaLinkedList集合

2021-02-02 18:39:05

JavaScript

2021-01-29 18:41:16

JavaScript函數語法

2022-12-14 08:03:27

CSS變量前端

2021-06-04 09:56:01

JavaScript 前端switch

2020-02-28 11:29:00

ElasticSear概念類比

2020-11-10 10:48:10

JavaScript屬性對象

2023-05-08 08:21:15

JavaNIO編程

2020-12-08 08:09:49

SVG圖標Web
點贊
收藏

51CTO技術棧公眾號

日韩视频在线你懂得| 91丨porny丨蝌蚪视频| 色偷偷噜噜噜亚洲男人的天堂| 国产亚洲视频一区| av片哪里在线观看| 久久综合狠狠综合久久激情 | 欧产日产国产精品视频| 欧美国产成人精品| 国产精品亚洲一区| 一道本在线视频| 99在线|亚洲一区二区| 一区二区三区国产视频| 无码人妻丰满熟妇区毛片蜜桃精品| 欧美黑人一区| 亚洲一区在线观看视频| 午夜免费电影一区在线观看| 亚洲精品一级片| 美腿丝袜一区二区三区| 午夜精品一区二区三区在线播放| 欧美波霸videosex极品| 久久久久影视| 欧美一区二区三区系列电影| av网站在线不卡| 高清在线视频不卡| 伊人婷婷欧美激情| 香蕉久久夜色| 国家队第一季免费高清在线观看| 成人网在线免费视频| 国产精品亚洲一区二区三区| 国产无遮挡呻吟娇喘视频| 国产精品地址| 欧美大胆a视频| 影音先锋男人资源在线观看| 国产在线日韩精品| 亚洲精品99999| 国产在线观看免费播放| 另类视频一区二区三区| 9191精品国产综合久久久久久| 日本999视频| 少妇淫片在线影院| 午夜视频在线观看一区二区 | 欧美sm美女调教| 午夜免费福利网站| 91麻豆精品国产综合久久久| 欧美性三三影院| 亚洲乱码国产一区三区| 久久sese| 在线影视一区二区三区| 国产成人精品无码播放| 欧美人体一区二区三区| 日韩欧美国产高清91| 欧美一级片免费播放| 精品精品导航| 亚洲福利视频三区| 国产精品专区在线| 老司机深夜福利在线观看| 午夜影院在线观看欧美| 中文字幕日本最新乱码视频| 日韩伦理在线| 色94色欧美sute亚洲13| 国产自偷自偷免费一区| 高清在线一区| 7777精品久久久大香线蕉| 亚洲一区二区中文字幕在线观看| 亚洲国产欧美国产第一区| 日韩精品一区二区三区中文精品| 东京热av一区| 希岛爱理av免费一区二区| 亚洲奶大毛多的老太婆| 国产无遮挡在线观看| 99久久99热这里只有精品| 久久综合伊人77777| 青青草手机在线视频| 伊人久久大香线蕉综合热线| 68精品久久久久久欧美| 老熟妇一区二区三区| 久久草av在线| 国产精品久久久久久久久久久久冷| 免费观看黄色一级视频| 日本一区二区三级电影在线观看 | 欧美被日视频| 亚洲国产综合色| 国产超级av在线| 国产成人亚洲一区二区三区| 日韩欧美一级精品久久| 中文在线永久免费观看| 日韩欧美大片| 久久久久久综合网天天| 无码人妻久久一区二区三区 | 国产乱码精品一区二区三区不卡| 欧美一区二区视频| 成人免费在线视频观看| 欧美久久久久久久久久久久久 | 欧美三电影在线| 国产精品成人免费一区久久羞羞| 精品中文一区| 欧美国产中文字幕| 波多野结衣小视频| 懂色av一区二区三区免费看| 日韩精品久久久毛片一区二区| av网址在线播放| 91福利在线播放| 女同性αv亚洲女同志| 欧美精选一区二区三区| 欧美激情喷水视频| 在线视频欧美亚洲| 97久久超碰精品国产| 男插女免费视频| 成人免费网站www网站高清| 精品国内片67194| 美女av免费看| 久久久青草婷婷精品综合日韩| 国产精品一区电影| 色综合久久网女同蕾丝边| 亚洲免费大片在线观看| 国产嫩草在线观看| 西野翔中文久久精品字幕| 欧美极品欧美精品欧美视频 | 中文字幕91爱爱| 97se亚洲国产综合自在线不卡 | 黑森林国产精品av| 91成人免费在线视频| 欧美久久久久久久久久久| 不卡一区综合视频| **欧美日韩vr在线| 亚洲xxxx天美| 亚洲男人都懂的| 污污的网站免费| 精品国产中文字幕第一页| 韩剧1988免费观看全集| 亚洲av永久无码国产精品久久| 亚洲欧美一区二区视频| 亚洲 欧美 日韩系列| 亚洲黄页网站| 国产91精品高潮白浆喷水| 少妇人妻精品一区二区三区| 一区二区三区日韩欧美精品| 亚洲天堂一区二区在线观看| 亚洲成av人片乱码色午夜| 国产精品私拍pans大尺度在线| 福利在线播放| 欧美性猛片aaaaaaa做受| 法国空姐电影在线观看| 视频一区二区中文字幕| 欧美日韩中文国产一区发布| 亚洲v.com| 亚洲欧美中文日韩v在线观看| 国产成人免费看| 久久综合狠狠综合久久激情| 日韩av黄色网址| 蜜臀91精品国产高清在线观看| 欧美亚洲国产视频| 国产区av在线| 欧美日韩五月天| 来吧亚洲综合网| 韩国成人在线视频| 99中文字幕在线观看| 国产aⅴ精品一区二区四区| 久久天天躁夜夜躁狠狠躁2022| 91福利在线观看视频| 国产精品久久久久婷婷二区次| 黄色一级片免费的| 欧美va天堂| 国产亚洲欧美另类一区二区三区| 在线亚洲人成| 伊人av综合网| av手机免费看| 亚洲一区在线观看网站| 亚洲av无码一区二区二三区| 青娱乐精品视频在线| 色乱码一区二区三区熟女| 亚洲大奶少妇| 欧美亚洲国产视频| 欧美一级二级三级区| 日韩精品资源二区在线| 国产成人一区二区三区影院在线| 国产婷婷色一区二区三区四区 | 久久久久久久精| 欧美第一页浮力影院| 一精品久久久| 免费h精品视频在线播放| 欧美天堂一区| 国语对白做受69| 成人好色电影| 精品国产一区二区国模嫣然| 高潮毛片又色又爽免费| 成人欧美一区二区三区1314| 亚洲熟女乱综合一区二区三区| 欧美aⅴ一区二区三区视频| 成人午夜免费剧场| 欧美猛男男男激情videos| 亚洲一区二区三区乱码aⅴ| 麻豆视频在线看| 北条麻妃久久精品| 欧美精品a∨在线观看不卡| 3d成人动漫网站| 青青青国产在线| 综合在线观看色| b站大片免费直播| 国产成人啪午夜精品网站男同| 精品免费国产一区二区| 欧美日韩日本国产亚洲在线| 日韩中文字幕一区二区| 国产伦乱精品| 91美女高潮出水| 韩日精品一区| 97婷婷涩涩精品一区| 成人午夜在线影视| 亚洲桃花岛网站| 女人18毛片水真多18精品| 欧美日韩mp4| 国产一级精品毛片| 精品国产成人av| 久久免费黄色网址| 亚洲图片你懂的| 免费一级特黄3大片视频| av电影在线观看一区| 日本网站在线看| 麻豆成人免费电影| 亚洲免费一在线| 国产一级片自拍| 日本特黄久久久高潮| 日韩欧美视频网站| 欧美日韩三级| 高清无码一区二区在线观看吞精| 999国产精品视频| 亚洲一区二区高清视频| 欧州一区二区| 日韩中文字幕一区| 精品久久一区| 日本一区不卡| 欧美日韩播放| 欧美激情第六页| 色吊丝一区二区| 精品国产福利| 欧美做受69| 国内精品久久国产| 秋霞综合在线视频| 精品免费日产一区一区三区免费| 成人黄色av网址| 国产精品久久久久久久天堂第1集 国产精品久久久久久久免费大片 国产精品久久久久久久久婷婷 | 福利网址在线观看| 精品久久久久久亚洲精品| 豆国产97在线 | 亚洲| 亚洲一区二区偷拍精品| av资源吧首页| 亚洲成a人片在线观看中文| 日韩欧美激情视频| 欧美日韩美女在线观看| 天堂中文在线网| av美女在线观看| 在线激情影院一区| 国产黄在线观看免费观看不卡| 精品视频偷偷看在线观看| 亚洲人妻一区二区| 亚洲视频精品在线| 日本中文字幕在线视频| 99爱在线观看| 午夜精品福利一区二区蜜股av| 国产性一乱一性一伧一色| 亚洲18色成人| 无码视频在线观看| 欧美另类变人与禽xxxxx| av片免费播放| 亚洲精品www久久久久久广东| 亚洲av片一区二区三区| 亚洲一二在线观看| av网站大全在线| 97超级碰碰碰久久久| 日韩制服一区| 亚洲a在线观看| 任你躁在线精品免费| 日韩高清在线播放| 欧美综合久久| 992tv快乐视频| 午夜亚洲一区| 中文字幕成人在线视频| 国产成人h网站| 久久精品国产亚洲av久| 中文字幕一区二区三区在线播放| 久久久久97国产| 色哟哟一区二区| 国产免费黄色录像| 日韩电视剧在线观看免费网站| 成年在线观看免费人视频| 欧美大片免费看| 亚洲成人av观看| 成人av网站观看| 精品久久久久久久久久久aⅴ| 成人在线视频一区二区三区| 视频一区国产视频| 亚洲黄色小说在线观看| 久久久精品中文字幕麻豆发布| 一区二区视频免费看| 亚洲一区二区三区四区不卡| 国产精品第6页| 亚洲成年人在线| а√资源新版在线天堂| 国产脚交av在线一区二区| jizz久久精品永久免费| 日本一区二区三区四区在线观看| 国产精品av久久久久久麻豆网| 日本老熟妇毛茸茸| 99久久精品国产一区二区三区| 99热这里只有精品4| 欧美日韩激情小视频| 精品久久无码中文字幕| 国产亚洲欧美日韩一区二区| 国内高清免费在线视频| 成人福利网站在线观看11| 欧美日韩xxxx| 欧美日韩在线中文| 成人一道本在线| 欧美第一页在线观看| 欧美亚一区二区| 色鬼7777久久| 日韩国产一区二区| 丁香六月激情婷婷| 国产精品自拍av| 97在线观看视频免费| 91激情在线视频| 日韩一区av| 91国产美女在线观看| 99热这里只有精品首页| 大片在线观看网站免费收看| 久久av资源网| 99国产精品无码| 欧美日韩国产三级| аⅴ资源新版在线天堂| 国产成人综合精品在线| 国产午夜精品福利| 天天操天天干视频| 亚洲成人av在线播放| 午夜羞羞小视频在线观看| 成人有码在线播放| 99久久.com| 涩涩网站在线看| 国产精品美女久久久久av爽李琼 | 91精品黄色| 91国语精品自产拍| 中文字幕第22页| 亚洲天堂中文字幕| 91tv国产成人福利| 久久久91精品国产| 国产精品亚洲四区在线观看| 艳母动漫在线观看| 国产乱码一区二区三区| 青青操国产视频| 欧美一区二区视频在线观看2022 | 色综合久久久网| 午夜视频福利在线| 一区二区三区四区激情 | 亚洲欧洲日本mm| 精品久久久久一区二区| 亚洲电影激情视频网站| 婷婷丁香花五月天| 91精品国产九九九久久久亚洲| 日韩av网站在线免费观看| 欧美三级一级片| 国产女同互慰高潮91漫画| 中文字幕网址在线| 久久在线免费视频| 99re91这里只有精品| 日韩avxxx| 国产精品久久久久久久久久免费看| 国产又粗又大又爽| 欧美另类高清videos| 粉嫩一区二区三区四区公司1| 黄色片久久久久| 中文字幕亚洲不卡| 日韩精品一级二级| 亚洲综合激情视频| 亚洲日本乱码在线观看| www视频在线| 欧日韩不卡在线视频| 第一会所sis001亚洲| 久久aaaa片一区二区| 国产男女无套免费网站| 日韩成人在线视频网站| 欧美日韩免费看片| 亚洲一二三区精品| 成人avav影音| 在线播放国产一区| 欧美极品美女电影一区| 国产一区二区三区探花| 亚洲在线观看网站| 日韩欧美大尺度| caoporn97在线视频| 欧美精品二区三区四区免费看视频| 美国十次了思思久久精品导航| 久久久全国免费视频| 国产一级揄自揄精品视频| 亚洲一区电影| 手机看片一级片| 精品二区三区线观看| 国产色在线观看| 国产成人一级电影| 麻豆国产尤物av尤物在线观看| 亚洲欧洲国产一区| 日韩三级精品|