精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark源碼分析之分區器的作用

大數據 Spark
首先對spark有一定了解的都應該知道,在spark中每個RDD可以理解為一組分區,這些分區對應了內存塊block,他們才是數據最終的載體。那么一個RDD由不同的分區組成,這樣在處理一些map,filter等算子的時候,就可以直接以分區為單位并行計算了。直到遇到shuffle的時候才需要和其他的RDD配合。

[[189314]]

最近因為手抖,在Spark中給自己挖了一個數據傾斜的坑。為了解決這個問題,順便研究了下Spark分區器的原理,趁著周末加班總結一下~

先說說數據傾斜

數據傾斜是指Spark中的RDD在計算的時候,每個RDD內部的分區包含的數據不平均。比如一共有5個分區,其中一個占有了90%的數據,這就導致本來5個分區可以5個人一起并行干活,結果四個人不怎么干活,工作全都壓到一個人身上了。遇到這種問題,網上有很多的解決辦法。

但是如果是底層數據的問題,無論怎么優化,還是無法解決數據傾斜的。

比如你想要對某個rdd做groupby,然后做join操作,如果分組的key就是分布不均勻的,那么真樣都是無法優化的。因為一旦這個key被切分,就無法完整的做join了,如果不對這個key切分,必然會造成對應的分區數據傾斜。

不過,了解數據為什么會傾斜還是很重要的,繼續往下看吧!

分區的作用

在PairRDD即(key,value)這種格式的rdd中,很多操作都是基于key的,因此為了獨立分割任務,會按照key對數據進行重組。比如groupbykey

重組肯定是需要一個規則的,最常見的就是基于Hash,Spark還提供了一種稍微復雜點的基于抽樣的Range分區方法。

下面我們先看看分區器在Spark計算流程中是怎么使用的:

Paritioner的使用

就拿groupbykey來說:

  1. def groupByKey(): JavaPairRDD[K, JIterable[V]] = 
  2.     fromRDD(groupByResultToJava(rdd.groupByKey())) 

它會調用PairRDDFunction的groupByKey()方法

  1. def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { 
  2.     groupByKey(defaultPartitioner(self)) 
  3.   } 

在這個方法里面創建了默認的分區器。默認的分區器是這樣定義的:

  1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { 
  2.     val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse 
  3.     for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { 
  4.       return r.partitioner.get 
  5.     } 
  6.     if (rdd.context.conf.contains("spark.default.parallelism")) { 
  7.       new HashPartitioner(rdd.context.defaultParallelism) 
  8.     } else { 
  9.       new HashPartitioner(bySize.head.partitions.size
  10.     } 
  11.   } 

首先獲取當前分區的分區個數,如果沒有設置spark.default.parallelism參數,則創建一個跟之前分區個數一樣的Hash分區器。

當然,用戶也可以自定義分區器,或者使用其他提供的分區器。API里面也是支持的:

  1. // 傳入分區器對象 
  2. def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = 
  3.     fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) 
  4. // 傳入分區的個數 
  5. def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = 
  6.     fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) 

HashPatitioner

Hash分區器,是最簡單也是默認提供的分區器,了解它的分區規則,對我們處理數據傾斜或者設計分組的key時,還是很有幫助的。

  1. class HashPartitioner(partitions: Int) extends Partitioner { 
  2.   require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative."
  3.  
  4.   def numPartitions: Int = partitions 
  5.  
  6.   // 通過key計算其HashCode,并根據分區數取模。如果結果小于0,直接加上分區數。 
  7.   def getPartition(keyAny): Int = key match { 
  8.     case null => 0 
  9.     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) 
  10.   } 
  11.  
  12.   // 對比兩個分區器是否相同,直接對比其分區個數就行 
  13.   override def equals(other: Any): Boolean = other match { 
  14.     case h: HashPartitioner => 
  15.       h.numPartitions == numPartitions 
  16.     case _ => 
  17.       false 
  18.   } 
  19.  
  20.   override def hashCode: Int = numPartitions 

這里最重要的是這個Utils.nonNegativeMod(key.hashCode, numPartitions),它決定了數據進入到哪個分區。

  1. def nonNegativeMod(x: Int, mod: Int): Int = { 
  2.     val rawMod = x % mod 
  3.     rawMod + (if (rawMod < 0) mod else 0) 
  4.   } 

說白了,就是基于這個key獲取它的hashCode,然后對分區個數取模。由于HashCode可能為負,這里直接判斷下,如果小于0,再加上分區個數即可。

因此,基于hash的分區,只要保證你的key是分散的,那么最終數據就不會出現數據傾斜的情況。

RangePartitioner

這個分區器,適合想要把數據打散的場景,但是如果相同的key重復量很大,依然會出現數據傾斜的情況。

每個分區器,最核心的方法,就是getPartition

  1. def getPartition(keyAny): Int = { 
  2.     val k = key.asInstanceOf[K] 
  3.     var partition = 0 
  4.     if (rangeBounds.length <= 128) { 
  5.       // If we have less than 128 partitions naive search 
  6.       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { 
  7.         partition += 1 
  8.       } 
  9.     } else { 
  10.       // Determine which binary search method to use only once. 
  11.       partition = binarySearch(rangeBounds, k) 
  12.       // binarySearch either returns the match location or -[insertion point]-1 
  13.       if (partition < 0) { 
  14.         partition = -partition-1 
  15.       } 
  16.       if (partition > rangeBounds.length) { 
  17.         partition = rangeBounds.length 
  18.       } 
  19.     } 
  20.     if (ascending) { 
  21.       partition 
  22.     } else { 
  23.       rangeBounds.length - partition 
  24.     } 
  25.   } 

在range分區中,會存儲一個邊界的數組,比如[1,100,200,300,400],然后對比傳進來的key,返回對應的分區id。

那么這個邊界是怎么確定的呢?

這就是Range分區最核心的算法了,大概描述下,就是遍歷每個paritiion,對里面的數據進行抽樣,把抽樣的數據進行排序,并按照對應的權重確定邊界。

有幾個比較重要的地方:

1 抽樣

2 確定邊界

關于抽樣,有一個很常見的算法題,即在不知道數據規模的情況下,如何以等概率的方式,隨機選擇一個值。

最笨的辦法,就是遍歷一次數據,知道數據的規模,然后隨機一個數,取其對應的值。其實這樣相當于遍歷了兩次(第二次的取值根據不同的存儲介質,可能不同)。

在Spark中,是使用水塘抽樣這種算法。即首先取***個值,然后依次往后遍歷;第二個值有二分之一的幾率替換選出來的值;第三個值有三分之一的幾率替換選出來的值;…;直到遍歷到***一個值。這樣,通過依次遍歷就取出來隨機的數值了。

算法參考源碼:

  1. private var rangeBounds: Array[K] = { 
  2.     if (partitions <= 1) { 
  3.       Array.empty 
  4.     } else { 
  5.       // This is the sample size we need to have roughly balanced output partitions, capped at 1M. 
  6.       // ***采樣數量不能超過1M。比如,如果分區是5,采樣數為100 
  7.       val sampleSize = math.min(20.0 * partitions, 1e6) 
  8.       // Assume the input partitions are roughly balanced and over-sample a little bit
  9.       // 每個分區的采樣數為平均值的三倍,避免數據傾斜造成的數據量過少 
  10.       val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt 
  11.  
  12.       // 真正的采樣算法(參數1:rdd的key數組, 采樣個數) 
  13.       val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) 
  14.       if (numItems == 0L) { 
  15.         Array.empty 
  16.       } else { 
  17.         // If a partition contains much more than the average number of items, we re-sample from it 
  18.         // to ensure that enough items are collected from that partition. 
  19.         // 如果有的分區包含的數量遠超過平均值,那么需要對它重新采樣。每個分區的采樣數/采樣返回的總的記錄數 
  20.         val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) 
  21.         //保存有效的采樣數 
  22.         val candidates = ArrayBuffer.empty[(K, Float)] 
  23.         //保存數據傾斜導致的采樣數過多的信息 
  24.         val imbalancedPartitions = mutable.Set.empty[Int
  25.  
  26.         sketched.foreach { case (idx, n, sample) => 
  27.           if (fraction * n > sampleSizePerPartition) { 
  28.             imbalancedPartitions += idx 
  29.           } else { 
  30.             // The weight is 1 over the sampling probability. 
  31.             val weight = (n.toDouble / sample.size).toFloat 
  32.             for (key <- sample) { 
  33.               candidates += ((key, weight)) 
  34.             } 
  35.           } 
  36.         } 
  37.         if (imbalancedPartitions.nonEmpty) { 
  38.           // Re-sample imbalanced partitions with the desired sampling probability. 
  39.           val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains
  40.           val seed = byteswap32(-rdd.id - 1) 
  41.           //基于RDD獲取采樣數據 
  42.           val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() 
  43.           val weight = (1.0 / fraction).toFloat 
  44.           candidates ++= reSampled.map(x => (x, weight)) 
  45.         } 
  46.         RangePartitioner.determineBounds(candidates, partitions) 
  47.       } 
  48.     } 
  49.   } 
  50.    
  51.   def sketch[K : ClassTag]( 
  52.       rdd: RDD[K], 
  53.       sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { 
  54.     val shift = rdd.id 
  55.     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object 
  56.     val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => 
  57.       val seed = byteswap32(idx ^ (shift << 16)) 
  58.       val (sample, n) = SamplingUtils.reservoirSampleAndCount( 
  59.         iter, sampleSizePerPartition, seed) 
  60.       //包裝成三元組,(索引號,分區的內容個數,抽樣的內容) 
  61.       Iterator((idx, n, sample)) 
  62.     }.collect() 
  63.     val numItems = sketched.map(_._2).sum 
  64.     //返回(數據條數,(索引號,分區的內容個數,抽樣的內容)) 
  65.     (numItems, sketched) 
  66.   } 
  67.    

真正的抽樣算法在SamplingUtils中,由于在Spark中是需要一次性取多個值的,因此直接去前n個數值,然后依次概率替換即可:

  1. def reservoirSampleAndCount[T: ClassTag]( 
  2.       input: Iterator[T], 
  3.       k: Int
  4.       seed: Long = Random.nextLong()) 
  5.     : (Array[T], Long) = { 
  6.     //創建臨時數組 
  7.     val reservoir = new Array[T](k) 
  8.     // Put the first k elements in the reservoir. 
  9.     // 取出前k個數,并把對應的rdd中的數據放入對應的序號的數組中 
  10.     var i = 0 
  11.     while (i < k && input.hasNext) { 
  12.       val item = input.next() 
  13.       reservoir(i) = item 
  14.       i += 1 
  15.     } 
  16.  
  17.     // If we have consumed all the elements, return them. Otherwise do the replacement. 
  18.     // 如果全部的元素,比要抽取的采樣數少,那么直接返回 
  19.     if (i < k) { 
  20.       // If input size < k, trim the array to return only an array of input size
  21.       val trimReservoir = new Array[T](i) 
  22.       System.arraycopy(reservoir, 0, trimReservoir, 0, i) 
  23.       (trimReservoir, i) 
  24.  
  25.     // 否則開始抽樣替換 
  26.     } else { 
  27.       // If input size > k, continue the sampling process. 
  28.       // 從剛才的序號開始,繼續遍歷 
  29.       var l = i.toLong 
  30.       // 隨機數 
  31.       val rand = new XORShiftRandom(seed) 
  32.       while (input.hasNext) { 
  33.         val item = input.next() 
  34.         // 隨機一個數與當前的l相乘,如果小于采樣數k,就替換。(越到后面,替換的概率越小...) 
  35.         val replacementIndex = (rand.nextDouble() * l).toLong 
  36.         if (replacementIndex < k) { 
  37.           reservoir(replacementIndex.toInt) = item 
  38.         } 
  39.         l += 1 
  40.       } 
  41.       (reservoir, l) 
  42.     } 
  43.   } 

確定邊界

***就可以通過獲取的樣本數據,確定邊界了。

  1. def determineBounds[K : Ordering : ClassTag]( 
  2.       candidates: ArrayBuffer[(K, Float)], 
  3.       partitions: Int): Array[K] = { 
  4.     val ordering = implicitly[Ordering[K]] 
  5.     // 數據格式為(key,權重) 
  6.     val ordered = candidates.sortBy(_._1) 
  7.     val numCandidates = ordered.size 
  8.     val sumWeights = ordered.map(_._2.toDouble).sum 
  9.     val step = sumWeights / partitions 
  10.     var cumWeight = 0.0 
  11.     var target = step 
  12.     val bounds = ArrayBuffer.empty[K] 
  13.     var i = 0 
  14.     var j = 0 
  15.     var previousBound = Option.empty[K] 
  16.     while ((i < numCandidates) && (j < partitions - 1)) { 
  17.       val (key, weight) = ordered(i) 
  18.       cumWeight += weight 
  19.       if (cumWeight >= target) { 
  20.         // Skip duplicate values
  21.         if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { 
  22.           bounds += key 
  23.           target += step 
  24.           j += 1 
  25.           previousBound = Some(key
  26.         } 
  27.       } 
  28.       i += 1 
  29.     } 
  30.     bounds.toArray 
  31.   } 

直接看代碼,還是有些晦澀難懂,我們舉個例子,一步一步解釋下:

按照上面的算法流程,大致可以理解:

  1. 抽樣-->確定邊界(排序) 

首先對spark有一定了解的都應該知道,在spark中每個RDD可以理解為一組分區,這些分區對應了內存塊block,他們才是數據最終的載體。那么一個RDD由不同的分區組成,這樣在處理一些map,filter等算子的時候,就可以直接以分區為單位并行計算了。直到遇到shuffle的時候才需要和其他的RDD配合。

在上面的圖中,如果我們不特殊設置的話,一個RDD由3個分區組成,那么在對它進行groupbykey的時候,就會按照3進行分區。

按照上面的算法流程,如果分區數為3,那么采樣的大小為:

  1. val sampleSize = math.min(20.0 * partitions, 1e6) 

即采樣數為60,每個分區取60個數。但是考慮到數據傾斜的情況,有的分區可能數據很多,因此在實際的采樣時,會按照3倍大小采樣:

  1. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt 

也就是說,最多會取60個樣本數據。

然后就是遍歷每個分區,取對應的樣本數。

  1. val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => 
  2.       val seed = byteswap32(idx ^ (shift << 16)) 
  3.       val (sample, n) = SamplingUtils.reservoirSampleAndCount( 
  4.         iter, sampleSizePerPartition, seed) 
  5.       //包裝成三元組,(索引號,分區的內容個數,抽樣的內容) 
  6.       Iterator((idx, n, sample)) 
  7.     }.collect() 

然后檢查,是否有分區的樣本數過多,如果多于平均值,則繼續采樣,這時直接用sample 就可以了

  1. sketched.foreach { case (idx, n, sample) => 
  2.           if (fraction * n > sampleSizePerPartition) { 
  3.             imbalancedPartitions += idx 
  4.           } else { 
  5.             // The weight is 1 over the sampling probability. 
  6.             val weight = (n.toDouble / sample.size).toFloat 
  7.             for (key <- sample) { 
  8.               candidates += ((key, weight)) 
  9.             } 
  10.           } 
  11.         } 
  12.         if (imbalancedPartitions.nonEmpty) { 
  13.           // Re-sample imbalanced partitions with the desired sampling probability. 
  14.           val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains
  15.           val seed = byteswap32(-rdd.id - 1) 
  16.           //基于RDD獲取采樣數據 
  17.           val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() 
  18.           val weight = (1.0 / fraction).toFloat 
  19.           candidates ++= reSampled.map(x => (x, weight)) 
  20.         } 

取出樣本后,就到了確定邊界的時候了。

注意每個key都會有一個權重,這個權重是 【分區的數據總數/樣本數】

  1. RangePartitioner.determineBounds(candidates, partitions) 

首先排序val ordered = candidates.sortBy(_._1),然后確定一個權重的步長

  1. val sumWeights = ordered.map(_._2.toDouble).sum 
  2. val step = sumWeights / partitions 

基于該步長,確定邊界,***就形成了幾個范圍數據。

然后分區器形成二叉樹,遍歷該數確定每個key對應的分區id

  1. partition = binarySearch(rangeBounds, k) 

實踐 —— 自定義分區器

自定義分區器,也是很簡單的,只需要實現對應的兩個方法就行:

  1. public class MyPartioner extends Partitioner { 
  2.     @Override 
  3.     public int numPartitions() { 
  4.         return 1000; 
  5.     } 
  6.  
  7.     @Override 
  8.     public int getPartition(Object key) { 
  9.         String k = (String) key
  10.         int code = k.hashCode() % 1000; 
  11.         System.out.println(k+":"+code); 
  12.         return  code < 0?code+1000:code; 
  13.     } 
  14.  
  15.     @Override 
  16.     public boolean equals(Object obj) { 
  17.         if(obj instanceof MyPartioner){ 
  18.             if(this.numPartitions()==((MyPartioner) obj).numPartitions()){ 
  19.                 return true
  20.             } 
  21.             return false
  22.         } 
  23.         return super.equals(obj); 
  24.     } 

使用的時候,可以直接new一個對象即可。

  1. pairRdd.groupbykey(new MyPartitioner()) 

這樣自定義分區器就完成了。

責任編輯:武曉燕 來源: 36大數據
相關推薦

2011-01-18 09:51:59

Linux磁盤分區

2010-07-21 14:55:48

SQL Server

2010-07-21 14:50:23

SQL Server

2015-07-13 09:56:37

2021-08-06 08:33:27

Springboot分布式Seata

2010-07-21 15:01:09

SQL Server

2011-01-18 10:25:19

Linux磁盤分區

2021-06-29 20:51:16

大數據框架分桶

2012-07-06 09:39:37

虛擬化

2009-06-12 15:25:38

Hibernate s

2024-01-05 08:38:20

SpringBeanScope

2009-11-12 16:41:36

路由器產品

2023-03-30 09:06:20

HiveSpark大數據

2009-03-11 12:43:29

存儲虛擬化服務器

2021-11-10 16:10:18

鴻蒙HarmonyOS應用

2013-10-15 16:20:59

試題鏈表

2009-11-09 16:16:39

2021-05-14 08:33:02

Flink策略源碼

2021-12-14 10:16:00

鴻蒙HarmonyOS應用

2021-01-08 05:22:47

Spark動態優化
點贊
收藏

51CTO技術棧公眾號

久久久久久黄| 亚洲日日夜夜| 国产三区在线成人av| 国产精品视频一| 日韩成人毛片视频| 美女视频亚洲色图| 欧美日韩在线观看视频| 一区二区在线高清视频| 人妻无码中文字幕免费视频蜜桃| 免费在线播放第一区高清av| 久久天天躁夜夜躁狠狠躁2022| 欧美激情一区二区三区p站| 国模冰冰炮一区二区| 亚洲欧美另类图片小说| 欧美美乳视频网站在线观看| 国产精品丝袜黑色高跟鞋| 99成人在线| 久久人人爽亚洲精品天堂| 制服丝袜第一页在线观看| yiren22亚洲综合| 五月天精品一区二区三区| 亚洲欧洲精品一区二区| 涩涩视频在线观看免费| 韩国v欧美v亚洲v日本v| 日本一区二区三区在线播放| 久草成人在线视频| 国产精品久久久久久麻豆一区软件 | 日本一不卡视频| 欧美激情图片区| 国产老头老太做爰视频| 国产一区二区在线| 日韩av在线免费观看一区| 99re6在线观看| 免费观看亚洲| 精品av在线播放| 草草草视频在线观看| 在线观看黄av| 国产欧美中文在线| 六月婷婷久久| 午夜视频www| 成人免费视频视频| 1卡2卡3卡精品视频| 一区二区三区黄色片| 丝袜美腿亚洲色图| 日韩av黄色在线观看| 国产成人免费看| 在线综合亚洲| 26uuu国产精品视频| 日本在线视频中文字幕| 国产精品jizz在线观看美国| 欧美大尺度激情区在线播放| 国产在线一卡二卡| 亚洲综合色站| 欧美www在线| 青青草成人免费| 国产精品v欧美精品v日本精品动漫| 久久在线视频在线| 久草视频手机在线| 欧美+亚洲+精品+三区| 美女福利视频一区| 久久黄色免费网站| 亚洲麻豆一区| 日本三级韩国三级久久| 亚洲永久精品一区| 蜜臀av国产精品久久久久| 国产精品丝袜久久久久久高清| 中文字幕av在线免费观看| 蜜桃传媒麻豆第一区在线观看| 国产精品999999| 伊人久久一区二区| 精品伊人久久久久7777人| 91亚洲精品久久久久久久久久久久| 国产精品久久久久久久久久久久久久久久久久 | www.伊人久久| 日日夜夜精品视频天天综合网| 国产成人小视频在线观看| 自拍偷拍精品视频| 国产一区二区三区观看| 国产chinese精品一区二区| 色欲av永久无码精品无码蜜桃| 久久婷婷一区二区三区| 亚洲一区二区在| 182tv在线播放| 精品久久久精品| 国产成人手机视频| 精品国产一区二区三区2021| 欧美变态凌虐bdsm| 一级片手机在线观看| 66视频精品| 91禁国产网站| 一级特黄aaaaaa大片| 国产91高潮流白浆在线麻豆 | 成人无号精品一区二区三区| 麻豆国产精品va在线观看不卡| 久久精品美女视频| 日韩二区三区在线观看| 亚洲一区二区三区视频| 四虎精品在永久在线观看| 国产精品久久久久桃色tv| 日韩在线观看a| 免费污视频在线一区| 欧美一区二区福利视频| 成人网站免费观看| 91精品1区| 国产国产精品人在线视| 亚洲第一视频在线| 国产精品色在线观看| 可以在线看的av网站| 久久国内精品| 精品视频久久久| xxxx日本少妇| 欧美a级一区二区| 精品国产一区二区三区麻豆小说 | 亚洲婷婷在线视频| 国产h视频在线播放| 999精品视频在线观看| 亚洲免费av网址| 精品在线视频免费| 国模一区二区三区白浆 | 99久久久国产精品美女| 欧美在线视频网站| 亚洲成人av综合| 国产精品久久久久久久裸模| 日韩av资源在线| 欧美三级电影在线| 欧美成人精品h版在线观看| 成人黄色片在线观看| 91老师片黄在线观看| 91动漫在线看| 中文字幕一区二区三区四区久久 | 日本在线播放一区| 国产在线精彩视频| 欧美变态tickle挠乳网站| 性欧美疯狂猛交69hd| 日本不卡不码高清免费观看| 欧美精品七区| 成人直播视频| 亚洲欧美激情精品一区二区| 日韩免费av片| 成人av电影在线| 国产真人做爰毛片视频直播| 91精品啪在线观看国产爱臀| 欧美另类在线播放| 国产特级黄色片| 亚洲免费看黄网站| 久久久久久综合网| 亚洲成人日韩| 3d蒂法精品啪啪一区二区免费| 麻豆tv免费在线观看| 欧美久久高跟鞋激| av黄色免费在线观看| 久久国产精品区| 最新国产精品久久| 精品三级国产| 久久久久女教师免费一区| 午夜美女福利视频| 亚洲第一成年网| 日韩 中文字幕| 久久成人精品| 亚洲免费不卡| 精品视频在线一区| 久久久久久久一区二区| 人人妻人人澡人人爽精品日本| 亚洲国产欧美一区二区三区丁香婷| 在线播放av网址| 一区二区三区福利| 欧美精品久久久| 亚洲a成人v| 久久久久这里只有精品| 午夜视频福利在线观看| 色狠狠一区二区| 人人干在线观看| 国产福利91精品一区二区三区| 日韩 欧美 视频| 免费电影一区二区三区| 国产精品永久免费观看| 日韩特级毛片| 亚洲精品一区二区久| 中文字幕男人天堂| 亚洲在线视频一区| 免费看污片网站| 狠狠色丁香久久婷婷综| 国产日韩av网站| 欧美男gay| 91在线视频导航| 日本在线播放一二三区| 最近2019年日本中文免费字幕| 国产99久久九九精品无码免费| 五月激情综合网| 亚洲色图日韩精品| 成人国产精品免费观看| 成人在线免费播放视频| 你懂的网址国产 欧美| 免费成人av网站| 2019中文亚洲字幕| 4k岛国日韩精品**专区| 男人的天堂在线视频免费观看 | 成人三级在线| 黑人一区二区三区| 2025国产精品视频| 18网站在线观看| 中文字幕av日韩| 成人免费视频国产免费麻豆| 欧美日韩小视频| 三级黄色在线视频| 亚洲视频 欧洲视频| 李宗瑞91在线正在播放| 国产成人在线观看| 日日干夜夜操s8| 校园激情久久| 久无码久无码av无码| 91精品国产自产在线观看永久∴ | 9色在线观看| 亚洲第一中文字幕在线观看| 国产精品嫩草影院精东| 91电影在线观看| 国产视频91在线| 一区二区三区欧美视频| jizz日本在线播放| 久久久久久久久久久99999| 无码任你躁久久久久久老妇| 韩国v欧美v日本v亚洲v| 日日躁夜夜躁aaaabbbb| 蜜桃久久av| 亚洲午夜精品久久久久久人妖| 欧美搞黄网站| 欧美日韩一级在线| 999国产精品视频| 日韩欧美视频一区二区三区四区| 日韩精品免费一区二区三区竹菊| 99久久99久久| 日韩综合一区二区三区| 亚洲aa中文字幕| 电影中文字幕一区二区| 国产在线视频不卡| 久久99国产精品二区高清软件| 国产成+人+综合+亚洲欧美丁香花| 爱啪啪综合导航| 97在线视频免费观看| 国产探花在线观看| 欧美激情视频给我| 国产蜜臀在线| 久久久在线免费观看| 国产第一页在线视频| 欧美激情精品久久久久久| 在线免费av导航| 久99久在线视频| 色帝国亚洲欧美在线| 欧美丰满少妇xxxxx做受| 欧美黄色视屏| 久久久免费电影| 最新欧美色图| 国产精品av网站| 国产成人久久精品麻豆二区| 国产精选久久久久久| 羞羞视频在线观看一区二区| 亚洲va久久久噜噜噜| 欧美在线在线| 精品欧美一区二区三区久久久| 欧美电影在线观看免费| 欧美极品jizzhd欧美| 青青草国产免费一区二区下载 | 亚洲高清av| 5月婷婷6月丁香| 日韩影院精彩在线| 国产福利在线免费| 国产黄人亚洲片| 欧美精品欧美极品欧美激情| 久久久www成人免费无遮挡大片| 影音先锋男人在线| 综合久久国产九一剧情麻豆| 久久久99精品| 福利视频一区二区| 最好看的日本字幕mv视频大全| 欧美精品丝袜中出| 后入内射欧美99二区视频| 精品一区二区电影| av中文字幕一区二区三区| 欧美大胆a视频| 久草在线中文最新视频| 国产精品夜色7777狼人| 日韩中文在线| 日韩精品久久一区二区三区| 永久亚洲成a人片777777| 亚洲 高清 成人 动漫| 麻豆91精品91久久久的内涵| 97中文字幕在线观看| 久久久久久久久久久电影| 日韩一级片av| 色又黄又爽网站www久久| 91亚洲精品国偷拍自产在线观看 | seseavlu视频在线| 精品中文字幕视频| 婷婷综合六月| 96久久精品| 欧美亚洲国产激情| 2019日韩中文字幕mv| 日本欧美大码aⅴ在线播放| 欧洲成人午夜精品无码区久久| 国产亚洲综合在线| 麻豆一区二区三区精品视频| 欧美在线观看视频一区二区| 色一情一乱一区二区三区| 爱福利视频一区| 日韩电影免费观看高清完整版| 999热视频在线观看| 不卡在线一区| 免费成人在线视频网站| 国产精品综合在线视频| 国产精成人品免费观看| 亚洲成a人v欧美综合天堂下载| 亚洲系列第一页| 亚洲毛片在线看| h片精品在线观看| 92看片淫黄大片欧美看国产片| 国产一区国产二区国产三区| 久久久久久久久久网| 国产一区二区三区免费在线观看| 国产成人精品无码免费看夜聊软件| 一区二区三区毛片| 一二区在线观看| 亚洲午夜色婷婷在线| 不卡专区在线| 国产精品免费一区二区三区在线观看 | 精品无人码麻豆乱码1区2区| xxxx日本免费| 欧美性猛交视频| 天天干天天爱天天操| 久久久久久久亚洲精品| 亚洲一区二区三区中文字幕在线观看| 一区二区三区电影| 麻豆精品新av中文字幕| 中文字幕第20页| 欧美视频在线看| 亚洲色图另类小说| 隔壁老王国产在线精品| 亚洲一区二区三区日本久久九| 天天在线免费视频| 国产一区不卡视频| 麻豆明星ai换脸视频| 在线不卡一区二区| 欧美13一16娇小xxxx| 国产精品午夜视频| 成人影院在线| 超碰超碰在线观看| 国产精品久久国产精麻豆99网站| 最近中文字幕av| 最新国产成人av网站网址麻豆| jizz久久久久久| 亚洲欧洲一区二区福利| 久久电影国产免费久久电影| 91香蕉国产视频| 欧美精品乱人伦久久久久久| 国产素人视频在线观看| 亚洲综合成人婷婷小说| 欧美日韩国产综合网| 俄罗斯黄色录像| 精品日本美女福利在线观看| 全色精品综合影院| 国产精品国产亚洲伊人久久| 日韩av片子| 97免费公开视频| 亚洲一二三级电影| 性感美女福利视频| 国产成人91久久精品| 成人黄色小视频| 一级黄色在线播放| 亚洲综合一二区| 手机福利小视频在线播放| 国产精品成人v| 久久久久美女| 久久久久成人精品无码中文字幕| 日韩人在线观看| 免费黄色网址在线观看| 国产精品入口免费| 日韩专区中文字幕一区二区| 色婷婷粉嫩av| 精品国产91久久久久久久妲己| 亚洲人体视频| 资源网第一页久久久| av综合在线播放| 伊人成人在线观看| 久久久久久久久久久免费| 欧美色婷婷久久99精品红桃| 久久aaaa片一区二区| 色综合天天狠狠| www.在线视频| 欧美精品成人一区二区在线观看| 精品一区二区三区欧美| www..com国产| 久久精品免费电影| 伊人春色精品| 三上悠亚 电影| 欧美亚洲一区二区在线| 毛片大全在线观看| 蜜桃在线一区二区三区精品| 韩国精品久久久| 亚洲精品国产欧美在线观看| 欧美激情xxxx| 久久一本综合|