精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink常見維表Join方案,收藏學習開發很有用!

大數據
由于維表是一張不斷變化的表(靜態表只是動態表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統的 JOIN 語法來表達維表 JOIN,是不完整的。因為維表是一直在更新變化的,如果用這個語法那么關聯上的是哪個時刻的維表呢?我們是不知道的,結果是不確定的。

本文轉載自微信公眾號「大數據左右手」,作者左右 。轉載本文請聯系大數據左右手公眾號。

前言

實時數倉,難免會遇到join維表的業務。現總結幾種方案,供各位看官選擇:

  • 查找關聯(同步,異步)
  • 狀態編程,預加載數據到狀態中,按需取
  • 冷熱數據
  • 廣播維表
  • Temporal Table Join
  • Lookup Table Join

其中中間留下兩個問題,供大家思考,可留言一起討論?

查找關聯

查找關聯就是在主流數據中直接訪問外部數據(mysql,redis,impala ...)去根據主鍵或者某種關鍵條件去關聯取值。

適合: 維表數據量大,但是主數據不大的業務實時計算。

缺點:數據量大的時候,會給外部數據源庫帶來很大的壓力,因為某條數據都需要關聯。

同步

訪問數據庫是同步調用,導致 subtak 線程會被阻塞,影響吞吐量

  1. import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} 
  2. import com.wang.stream.env.{FlinkStreamEnv, KafkaSourceEnv} 
  3. import org.apache.flink.api.common.functions.FlatMapFunction 
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer 
  7. import org.apache.flink.util.Collector 
  8.  
  9.  def analyses(): Unit ={ 
  10.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  11.     KafkaSourceEnv.getKafkaSourceStream(env,List("test")) 
  12.       .map(JSON.parseObject(_)) 
  13.       .filter(_!=null
  14.       .flatMap( 
  15.         new FlatMapFunction[JSONObject,String] { 
  16.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  17.             // 如果topic就一張表,不用區分,如果多張表,可以通過database 與 table 區分,放到下一步去處理 
  18.             // 表的名字 
  19.             val databaseName:String = jSONObject.getString("database"
  20.             // 表的名字 
  21.             val tableName:String = jSONObject.getString("table"
  22.             // 數據操作類型 INSERT UPDATE DELETE 
  23.             val operationType:String = jSONObject.getString("type"
  24.             // 主體數據 
  25.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  26.             // old 值 
  27.             val old: JSONArray = jSONObject.getJSONArray("old"
  28.             // canal json 可能存在批處理出現data數據多條 
  29.             for (i <- 0 until tableData.size()) { 
  30.               val data: String = tableData.get(i).toString 
  31.               val nObject: JSONObject = JSON.parseObject(data) 
  32.               val orderId: AnyRef = nObject.get("order_id"
  33.               // 下面寫(mysql,redis或者hbase)的連接,利用api 通過orderId查找 
  34.                
  35.               // 最后封裝數據格式 就是join所得 
  36.               collector.collect(null
  37.             } 
  38.           } 
  39.         } 
  40.       ) 
  41.       .addSink( 
  42.         new FlinkKafkaProducer[String]( 
  43.           ""
  44.           ""
  45.           new SimpleStringSchema() 
  46.         ) 
  47.       ) 
  48.     env.execute(""

異步

AsyncIO 可以并發地處理多個請求,很大程度上減少了對 subtask 線程的阻塞。

  1. def analyses(): Unit ={ 
  2.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  3.     val source: DataStream[String] = KafkaSourceEnv.getKafkaSourceStream(env, List("test")) 
  4.       .map(JSON.parseObject(_)) 
  5.       .filter(_ != null
  6.       .flatMap( 
  7.         new FlatMapFunction[JSONObject, String] { 
  8.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  9.             // 如果topic就一張表,不用區分,如果多張表,可以通過database 與 table 區分,放到下一步去處理 
  10.             // 表的名字 
  11.             val databaseName: String = jSONObject.getString("database"
  12.             // 表的名字 
  13.             val tableName: String = jSONObject.getString("table"
  14.             // 數據操作類型 INSERT UPDATE DELETE 
  15.             val operationType: String = jSONObject.getString("type"
  16.             // 主體數據 
  17.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  18.             // old 值 
  19.             val old: JSONArray = jSONObject.getJSONArray("old"
  20.             // canal json 可能存在批處理出現data數據多條 
  21.             for (i <- 0 until tableData.size()) { 
  22.               val data: String = tableData.get(i).toString 
  23.               collector.collect(data) 
  24.             } 
  25.           } 
  26.         } 
  27.       ) 
  28.        
  29.     AsyncDataStream.unorderedWait( 
  30.       source, 
  31.       new RichAsyncFunction[String,String] {//自定義的數據源異步處理類 
  32.         override def open(parameters: Configuration): Unit = { 
  33.           // 初始化 
  34.         } 
  35.         override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { 
  36.            
  37.           // 將數據搜集 
  38.           resultFuture.complete(null
  39.        } 
  40.  
  41.         override def close(): Unit = { 
  42.           // 關閉 
  43.         } 
  44.     }, 
  45.     1000,//異步超時時間 
  46.     TimeUnit.MILLISECONDS,//時間單位 
  47.     100)//最大異步并發請求數量 
  48.      .addSink( 
  49.         new FlinkKafkaProducer[String]( 
  50.           ""
  51.           ""
  52.           new SimpleStringSchema() 
  53.         ) 
  54.       ) 
  55.  
  56.     env.execute(""
  57.   } 

狀態編程,預加載數據到狀態中,按需取

首先把維表數據初始化到state中,設置好更新時間,定時去把維表。

優點:flink 自己維護狀態數據,"榮辱與共",不需要頻繁鏈接外部數據源,達到解耦。

缺點:不適合大的維表和變化大的維表。

  1. .keyBy(_._1) 
  2. .process( 
  3.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  4.     private var mapState:MapState[String,Map[String,String]] = _ 
  5.     private var first: Boolean = true 
  6.      
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.minutes(5)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.       // 加載維表 
  22.       if(first){ 
  23.         first = false 
  24.         val time: Long = System.currentTimeMillis() 
  25.         getSmallDimTableInfo() 
  26.         // 設置好更新時間,定時去把維表 
  27.         context.timerService().registerProcessingTimeTimer(time + 86400000) 
  28.       } 
  29.        
  30.       // 數據處理,過來一條條數據,然后按照自己的業務邏輯去取維表的數據即可 
  31.        
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.  
  36.     override def onTimer( 
  37.                           timestamp: Long, 
  38.                           ctx: KeyedProcessFunction[String, (String, String, String, String, String),String]#OnTimerContext, 
  39.                           out: Collector[String]): Unit = { 
  40.       println("觸發器執行"
  41.       mapState.clear() 
  42.       getSmallDimTableInfo() 
  43.       println(mapState) 
  44.       ctx.timerService().registerProcessingTimeTimer(timestamp + 86400000) 
  45.     } 
  46.     def getSmallDimTableInfo(): Unit ={ 
  47.  
  48.       // 加載 字典數據 
  49.       val select_dictionary="select dic_code,pre_dictionary_id,dic_name from xxxx" 
  50.       val dictionary: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(select_dictionary, null
  51.       dictionary.foreach(item=>{ 
  52.         mapState.put("dic_dictionary_"+item.get("pre_dictionary_id").toString,item) 
  53.       }) 
  54.  
  55.     } 
  56.   } 
  57. .filter(_!=null
  58. .addSink( 
  59.   new FlinkKafkaProducer[String]( 
  60.     ""
  61.     ""
  62.     new SimpleStringSchema() 
  63.   ) 
  64. v.execute(""

思考下:直接定義一個Map集合這樣的優缺點是什么?可以留言說出自己的看法?

冷熱數據

思想:先去狀態去取,如果沒有,去外部查詢,同時去存到狀態里面。StateTtlConfig 的過期時間可以設置短點。

優點:中庸取值方案,熱備常用數據到內存,也避免了數據join相對過多外部數據源。

缺點:也不能一勞永逸解決某些問題,熱備數據過多,或者冷數據過大,都會對state 或者 外部數據庫造成壓力。

  1. .filter(_._1 != null
  2. .keyBy(_._1) 
  3. .process( 
  4.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  5.     private var mapState:MapState[String,Map[String,String]] = _ 
  6.     private var first: Boolean = true 
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.days(1)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.  
  22.       // 數據處理,過來一條條數據,然后按照自己的業務邏輯先去mapState去找,如果沒有再去 外部去找 
  23.       if (mapState.contains("xx_id")){ 
  24.         // 如果存在就取 
  25.  
  26.       }else
  27.         // 如果不存在去外部拿,然后放到mapState中 
  28.         val dim_sql="select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id" 
  29.         val dim: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(dim_sql, null
  30.         mapState.put("xx_id",null
  31.       } 
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.   } 

廣播維表

比如上面提到的字典表,每一個Task都需要這份數據,那么需要join這份數據的時候就可以使用廣播維表。

  1. val dimStream=env.addSource(MysqlSource) 
  2.  
  3. // 廣播狀態 
  4. val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcaststate", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class)) 
  5.  
  6. // 廣播流 
  7. val broadStream=dimStream.broadcast() 
  8.  
  9. // 主數據流 
  10. val mainConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaConfig) 
  11.  
  12. val mainStream=env.addSource(mainConsumer) 
  13.  
  14. // 廣播狀態與維度表關聯 
  15. val connectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1) 
  16.  
  17. connectedStream.process(new KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String] { 
  18.  
  19.      override def processElement(value: User, ctx: KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String]#ReadOnlyContext, out: Collector[String]): Unit = { 
  20.       // 取到數據就可以愉快的玩耍了 
  21.      val state=ctx.getBroadcastState(broadcastStateDesc) 
  22.        xxxxxx 
  23.          
  24.   } 

「思考:」 如果把維表流也通過實時監控binlog到kafka,當維度數據發生變化時,更新放到狀態中,這種方式,是不是更具有時效性呢?

(1)通過canal把變更binlog方式發送到kafka中。

(2)數據流定義成為廣播流,廣播到數據到主數據流中。

(3)定義一個廣播狀態存儲數據,在主數據進行查找匹配,符合要求則join成功。

Temporal Table Join(FlinkSQL與Flink Table API)

由于維表是一張不斷變化的表(靜態表只是動態表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統的 JOIN 語法來表達維表 JOIN,是不完整的。因為維表是一直在更新變化的,如果用這個語法那么關聯上的是哪個時刻的維表呢?我們是不知道的,結果是不確定的。所以 Flink SQL 的維表 JOIN 語法引入了Temporal Table 的標準語法,用來聲明關聯的是維表哪個時刻的快照。

普通關聯會一直保留關聯雙側的數據,數據也就會一直膨脹,直到撐爆內存導致任務失敗,Temporal Join則可以定期清理過期數據,在合理的內存配置下即可避免內存溢出。

Event Time Temporal Join

語法

  1. SELECT [column_list] 
  2. FROM table1 [AS <alias1>] 
  3. [LEFTJOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] 
  4. ON table1.column-name1 = table2.column-name1 

使用事件時間屬性(即行時間屬性),可以檢索過去某個時間點的鍵值。這允許在一個共同的時間點連接兩個表。

舉例

假設我們有一個訂單表,每個訂單都有不同貨幣的價格。為了將此表正確地規范化為單一貨幣,每個訂單都需要與下訂單時的適當貨幣兌換率相結合。

  1. CREATE TABLE orders ( 
  2.     order_id    STRING, 
  3.     price       DECIMAL(32,2), 
  4.     currency    STRING, 
  5.     order_time  TIMESTAMP(3), 
  6.     WATERMARK FOR order_time AS order_time 
  7. WITH (/* ... */); 
  8.  
  9.  
  10. CREATE TABLE currency_rates ( 
  11.     currency STRING, 
  12.     conversion_rate DECIMAL(32, 2), 
  13.     update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL 
  14.     WATERMARK FOR update_time AS update_time, 
  15.     PRIMARY KEY(currency) NOT ENFORCED 
  16. WITH ( 
  17.    'connector' = 'upsert-kafka'
  18.    /* ... */ 
  19. ); 
  20.  
  21. event-time temporal join需要temporal join條件的等價條件中包含的主鍵 
  22.  
  23. SELECT  
  24.      order_id, 
  25.      price, 
  26.      currency, 
  27.      conversion_rate, 
  28.      order_time, 
  29. FROM orders 
  30. LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time 
  31. ON orders.currency = currency_rates.currency 

Processing Time Temporal Join

處理時間時態表連接使用處理時間屬性將行與外部版本表中鍵的最新版本相關聯。

根據定義,使用processing-time屬性,連接將始終返回給定鍵的最新值。可以將查找表看作是一個簡單的HashMap,它存儲來自構建端的所有記錄。這種連接的強大之處在于,當在Flink中無法將表具體化為動態表時,它允許Flink直接針對外部系統工作。

使用FOR SYSTEM_TIME AS OF table1.proctime表示當左邊表的記錄與右邊的維表join時,只匹配當前處理時間維表所對應的的快照數據。

Lookup Table Join

Lookup Join 通常用于通過連接外部表(維度表)補充信息,要求一個表具有處理時間屬性,另一個表使 Lookup Source Connector。

JDBC 連接器可以用在時態表關聯中作為一個可 lookup 的 source (又稱為維表)。用到的語法是 Temporal Joins 的語法。

  1. s""
  2.         |CREATE TABLE users( 
  3.         |id int
  4.         |name string, 
  5.         |PRIMARY KEY (id) NOT ENFORCED 
  6.         |) 
  7.         |WITH ( 
  8.         |'connector' = 'jdbc'
  9.         |'url' = 'xxxx'
  10.         |'driver'='$DRIVER_CLASS_NAME'
  11.         |'table-name'='$tableName'
  12.         |'lookup.cache.max-rows'='100'
  13.         |'lookup.cache.ttl'='30s' 
  14.         |) 
  15.         |""".stripMargin 
  16.          
  17.          
  18. s""
  19.        |CREATE TABLE car( 
  20.        |`id`   bigint , 
  21.        |`user_id` bigint
  22.        |`proctime` as PROCTIME() 
  23.        |) 
  24.        |WITH ( 
  25.        |    'connector' = 'kafka'
  26.        |    'topic' = '$topic'
  27.        |    'scan.startup.mode' = 'latest-offset'
  28.        |    'properties.bootstrap.servers' = '$KAFKA_SERVICE'
  29.        |    'properties.group.id' = 'indicator'
  30.        |    'format' = 'canal-json' 
  31.        |) 
  32.        |""".stripMargin 
  33.         
  34.         
  35.         
  36.     SELECT 
  37.         mc.user_id user_id, 
  38.         count(1) AS `value` 
  39.     FROM car mc 
  40.         inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id 
  41.     group by mc.user_id 

總結

總體來講,關聯維表有四個基礎的方式:

(1)查找外部數據源關聯

(2)預加載維表關聯(內存,狀態)

(3)冷熱數據儲備(算是1和2的結合使用)

(4)維表變更日志關聯(廣播也好,其他方式的流關聯也好)

「同時考慮:」 吞吐量,時效性,外部數據源的負載,內存資源,解耦性等等方面。

四種join方式不存在絕對的一勞永逸,更多的是針對業務場景在各指標上的權衡取舍,因此看官需要結合場景來選擇適合的。

 

責任編輯:武曉燕 來源: 大數據左右手
相關推薦

2021-07-13 10:02:52

Pandas函數Linux

2013-08-15 09:52:45

開發框架開發工具開發腳本

2016-12-14 20:53:04

Linuxgcc命令行

2016-12-14 19:19:19

Linuxgcc命令行

2015-10-27 11:02:06

Web開發CSS 庫

2014-06-13 11:26:53

CSS庫Web開發

2023-03-06 10:42:34

CSS前端

2013-07-12 09:45:16

PHP功能

2023-09-07 16:28:46

JavaScrip

2021-06-29 10:50:30

Python函數文件

2013-08-23 09:28:37

GitGit 命令

2011-05-10 08:47:55

開發者HTML 5W3C

2022-06-29 09:09:38

Python代碼

2021-11-30 23:30:45

sql 性能異步

2022-08-23 09:01:02

HTMLWeb

2011-05-16 08:37:56

JavaScript庫

2017-10-25 16:22:58

OpenStack操作Glance

2020-03-06 08:35:45

GitHub設計瀏覽器

2014-09-09 09:32:50

項目管理管理工具

2020-11-18 11:14:27

運維架構技術
點贊
收藏

51CTO技術棧公眾號

久久精品aⅴ无码中文字字幕重口| 狠狠色综合色区| 亚洲色图日韩精品| 久久gogo国模啪啪裸体| 亚洲最快最全在线视频| 好看的日韩精品| 中文字字幕在线中文乱码| 亚洲第一天堂| 日韩电影免费观看中文字幕| 天天视频天天爽| av人人综合网| 国产精品沙发午睡系列990531| 91黄在线观看| 国产精品自产拍在线观看中文| 国产成人无码精品久久二区三| 欧洲成人一区| 亚洲国产成人tv| 天天爽天天狠久久久| 丰满少妇高潮在线观看| 麻豆免费精品视频| 2021久久精品国产99国产精品| 亚洲一级二级片| 国产精品一区2区3区| 日韩欧美一区在线| 一区二区成人网| 国产精品13p| 亚洲男人的天堂在线观看| 久久精品国产美女| 精品欧美在线观看| 美腿丝袜亚洲色图| 欧美专区在线视频| 国产在线观看成人| 欧美国产91| 精品国产一区久久久| 性欧美成人播放77777| 亚洲精品观看| 91精品国产综合久久久久久漫画 | 午夜电影网一区| 日本特级黄色大片| 超碰在线国产| 久久久91精品国产一区二区三区| 国产aⅴ精品一区二区三区黄| 在线免费一级片| 日韩精品成人一区二区三区| 欧美性受xxx| 日韩三级视频在线| 亚洲国产精品一区| 欧美国产日韩视频| 久久久国产成人| 欧美激情综合| 欧美多人爱爱视频网站| 欧美三级日本三级| 亚洲色图国产| 欧美黑人巨大精品一区二区| 午夜激情福利网| 国产sm调教视频| 亚洲天天影视| 五月婷婷深深爱| 视频一区二区三区中文字幕| 欧美一级大片视频| 日韩乱码一区二区| 日韩视频三区| 欧美中文字幕在线播放| 国产成人精品777777| 久久久亚洲一区| 日本成人精品在线| 国产成人a v| 久久精品国产精品亚洲综合| 亚洲高清毛片| 日韩在线视频网| 国产又黄又粗又猛又爽的| 色综合久久一区二区三区| 精品国产欧美一区二区五十路 | 国产欧美日韩精品一区二区免费| 亚洲乱码一区二区| 人与嘼交av免费| 99re6这里只有精品| 欧美大尺度在线观看| 久久久久国产精品夜夜夜夜夜| 在线日韩中文| 国产精品9999| 北条麻妃一二三区| 99久久99久久综合| 日韩一二三区不卡在线视频| 久草免费在线| 亚洲国产另类精品专区| 97超碰青青草| 国产极品一区| 精品国产123| 在线小视频你懂的| 亚洲国产精品成人| 国产精品一区二区久久精品爱涩| 久久综合五月天| 久久久国产精华液| 视频一区在线视频| 97人人做人人人难人人做| 熟妇人妻av无码一区二区三区| 久久久噜噜噜久久人人看 | 伊人精品成人久久综合软件| 91精品国产高清自在线 | 欧美成人a∨高清免费观看| 免费看黄色片的网站| 神马影视一区二区| 久久不射电影网| www成人在线| 久久成人综合网| 韩国一区二区三区美女美女秀 | 久久久综合精品| 在线观看欧美激情| 九色porny丨入口在线| 欧美乱妇15p| 国产麻豆天美果冻无码视频| 婷婷亚洲最大| 国产成人97精品免费看片| 精品久久国产视频| 中文字幕午夜精品一区二区三区| 国产精品久久久久久久久免费丝袜 | 第一页在线视频| 国内精品久久久久久99蜜桃| 欧美激情一区二区久久久| 国产三级理论片| 91日韩精品一区| 2021国产视频| 亚洲精品777| 亚洲日本成人女熟在线观看| 日本少妇裸体做爰| 国产在线看一区| 四虎一区二区| 亚洲欧美小说色综合小说一区| 日韩欧美成人激情| 成人一级黄色大片| 蜜臀久久99精品久久久久久9| 国产一区二区高清不卡| 日本在线播放一区| 天天综合视频在线观看| 欧美日韩国产精品| 精品无码av一区二区三区不卡| 清纯唯美亚洲综合一区| 国产91在线播放九色快色| 国产精品久久久久久久美男| 亚洲乱码国产乱码精品| 99精品视频在线观看免费| 日本黄色片一级片| 国产日韩中文在线中文字幕| 色哟哟网站入口亚洲精品| 亚洲乱码国产乱码精品| 久久久久国产精品麻豆| 欧美 日韩 国产一区| 青青视频一区二区| 97精品国产aⅴ7777| 色婷婷中文字幕| 午夜精品一区二区三区电影天堂| 岛国精品一区二区三区| 午夜精彩国产免费不卡不顿大片| 亚洲精品免费在线视频| 2024最新电影在线免费观看| 日韩欧美国产综合| 国产小视频在线看| 亚洲成人网在线| 国产免费一区二区三区四在线播放| 老司机深夜福利在线观看| 精品成人一区二区| 日本在线免费观看| 99免费精品在线| 国产在线观看福利| 精品国产午夜| 国产欧美一区二区三区久久人妖 | 亚洲免费一级电影| 欧美一区二区三区不卡视频| 中文字幕不卡一区| 91丝袜超薄交口足| 国产一级特黄a大片免费| aaaa欧美| xxxxxxxxx欧美| 精品久久国产视频| 五月激情综合婷婷| 级毛片内射视频| 精品一区二区在线视频| 国产一级黄色录像片| av成人app永久免费| 97免费在线视频| a视频网址在线观看| 555www色欧美视频| 国产对白videos麻豆高潮| 久久人人97超碰com| 在线观看国产一级片| 国产一区日韩一区| 欧美午夜欧美| vam成人资源在线观看| 欧美激情小视频| 黄色免费在线播放| 欧美丰满美乳xxx高潮www| 久久综合成人网| 欧美国产精品v| 国产精品成人免费一区久久羞羞| 欧美在线综合| 永久免费网站视频在线观看| 日韩丝袜视频| 亚洲xxxx18| 欧美日韩五区| 毛片精品免费在线观看| 青青青草原在线| 欧美一区二区三区免费| 三级视频在线观看| 亚洲精品国产精华液| 免费在线观看a视频| 国产成人免费视频网站高清观看视频 | 一区二区三区蜜桃网| 中文字幕xxx| 丝袜美腿美女被狂躁在线观看| 国产一区欧美二区| 九九九九免费视频| 亚洲情侣在线| 亚洲人成网站在线播放2019| 国产主播性色av福利精品一区| 国产日韩欧美自拍| 中文字幕一区久| 欧美寡妇偷汉性猛交| 在线免费av网站| 亚洲人成在线观| 日本波多野结衣在线| 欧美一区二区三区啪啪| 亚洲午夜无码久久久久| 亚洲不卡在线观看| 26uuu成人网| 国产精品久久久久婷婷| 免费看污片网站| 99re这里只有精品视频首页| 不卡的一区二区| 国产在线精品不卡| 色播五月综合网| 老牛影视一区二区三区| 91视频 -- 69xx| 亚洲激情社区| 国产乱淫av片杨贵妃| 国产精品第十页| 国产在线无码精品| 欧美va天堂| 国产成人精品免费看在线播放| 三级影片在线观看| 国产不卡视频一区| 99精品视频免费版的特色功能| 美女脱光内衣内裤视频久久影院| 美女av免费在线观看| 日韩天堂av| 内射国产内射夫妻免费频道| 99日韩精品| 日韩欧美亚洲天堂| 麻豆亚洲精品| 午夜精品久久久内射近拍高清| 国产精品美女| 五月婷婷深爱五月| 麻豆成人在线观看| www.99r| 国产精品一区免费在线观看| 青娱乐国产精品视频| 成人综合在线观看| 亚洲一区二区三区综合| 99久久婷婷国产| 女~淫辱の触手3d动漫| 久久久久国色av免费看影院| 欧美人妻一区二区三区| 国产精品色在线| 男女性高潮免费网站| 亚洲自拍偷拍麻豆| 日韩三级免费看| 91福利小视频| 国产精品伊人久久| 亚洲精品一区在线观看| 日韩午夜影院| 色一区av在线| wwww在线观看免费视频| 国产精品毛片a∨一区二区三区| 亚洲观看黄色网| 久久亚洲二区三区| 大吊一区二区三区| 亚洲精品福利视频网站| 日韩男人的天堂| 欧洲激情一区二区| 国内精品久久久久久久久久久| 精品国产精品网麻豆系列| 九色在线视频蝌蚪| 日韩中文字幕av| а_天堂中文在线| 国产精品久久久久久久久久小说 | 欧美一区在线视频| www.国产免费| 精品亚洲一区二区三区在线观看| eeuss影院www在线播放| 欧美大片第1页| 午夜av成人| 91免费在线观看网站| 亚洲电影男人天堂| 伊人情人网综合| 日韩一级免费| 午夜啪啪小视频| 91丝袜呻吟高潮美腿白嫩在线观看| 亚洲色图欧美色| 亚洲一二三四在线观看| 久久这里只有精品9| 精品久久久久久久久久久久久久久| 韩国三级av在线免费观看| 九九九久久久久久| 亚洲爱爱视频| 久久精品国产第一区二区三区最新章节 | 99精品在线视频观看| 亚洲国产天堂久久综合网| 亚洲1卡2卡3卡4卡乱码精品| 欧美一级大片视频| 日本精品一区二区三区在线观看视频| 欧美一区二区高清在线观看| 国内精品久久久久久久97牛牛| 手机在线成人免费视频| 97se狠狠狠综合亚洲狠狠| 国产午夜手机精彩视频| 91国在线观看| 性感美女福利视频| 欧美激情精品久久久久久蜜臀| 日本黄色成人| 日韩久久久久久久| 成人激情四射网| 亚洲国产精品影院| 亚洲天堂中文字幕在线| 亚洲精品国精品久久99热 | 国内精品嫩模av私拍在线观看| 国产又黄又猛又粗又爽的视频| 99久久99久久免费精品蜜臀| 久草免费在线观看视频| 欧美日韩激情一区二区三区| 四虎精品在永久在线观看| 欧美激情国产精品| 欧美成人精品午夜一区二区| 爱爱爱视频网站| 久久精品国产精品亚洲精品| 精品人妻一区二区三区蜜桃视频| 欧美视频在线免费看| 免费观看毛片网站| 欧美激情视频网| jazzjazz国产精品久久| 国产免费内射又粗又爽密桃视频| 激情成人午夜视频| 99热在线观看精品| 欧美午夜寂寞影院| av网站无病毒在线| 国产日韩精品视频| 综合五月激情网| 欧美国产日韩亚洲一区| 狠狠人妻久久久久久综合| 狠狠精品干练久久久无码中文字幕 | av网站免费在线看| 色一情一伦一子一伦一区| 视频污在线观看| 欧美性在线观看| 蜜桃视频欧美| 免费午夜视频在线观看| 国产亚洲精品精华液| 亚洲av无码不卡| 中文字幕日韩av| 日韩一区二区三免费高清在线观看| 亚洲视频电影| 狠狠久久亚洲欧美| 欧美黄色免费在线观看| 欧美v亚洲v综合ⅴ国产v| 黄色大片在线| 好看的日韩精品| 日韩国产欧美在线观看| 亚洲色图日韩精品| 欧美一个色资源| av中文在线资源库| 欧美一二三四五区| 麻豆传媒一区二区三区| 免费中文字幕视频| 亚洲精品自拍第一页| 欧美色片在线观看| 日本一区二区免费高清视频| 成人性视频免费网站| 欧美一级特黄视频| 日韩在线不卡视频| 大奶在线精品| www.日日操| 亚洲久本草在线中文字幕| 欧美一级淫片aaaaaa| 国产aⅴ夜夜欢一区二区三区| 日韩综合网站| 四虎精品一区二区| 色网综合在线观看| 久久精品视频免费看| 国产91亚洲精品一区二区三区| 亚洲欧美日本日韩| 免费看特级毛片| 亚洲精品色婷婷福利天堂| 亚洲欧美久久精品| 久草热视频在线观看| 国产精品美女久久久久高潮| 性欧美一区二区三区| 欧美最猛黑人xxxx黑人猛叫黄| **女人18毛片一区二区| 国产传媒第一页| 欧美一区二区视频在线观看2020| 天堂中文av在线资源库|