精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Spark源碼走讀之3:Task運行期之函數調用

數據庫 Spark
本篇主要闡述在TaskRunner中執行的task其業務邏輯是如何被調用到的,另外試圖講清楚運行著的task其輸入的數據從哪獲取,處理的結果返回到哪里,如何返回。

準備

  1. spark已經安裝完畢

  2. spark運行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也稱為偽分布式,可以使用如下指令運行

  1. MASTER=local[1,2,1024] bin/spark-shell 

[1,2,1024] 分別表示,executor number, core number和內存大小,其中內存大小不應小于默認的512M

Driver Programme的初始化過程分析

初始化過程的涉及的主要源文件

  1. SparkContext.scala       整個初始化過程的入口

  2. SparkEnv.scala          創建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager

  3. DAGScheduler.scala       任務提交的入口,即將Job劃分成各個stage的關鍵

  4. TaskSchedulerImpl.scala 決定每個stage可以運行幾個task,每個task分別在哪個executor上運行

  5. SchedulerBackend

    1. 最簡單的單機運行模式的話,看LocalBackend.scala

    2. 如果是集群模式,看源文件SparkDeploySchedulerBackend

初始化過程步驟詳解

步驟1: 根據初始化入參生成SparkConf,再根據SparkConf來創建SparkEnv, SparkEnv中主要包含以下關鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

  1. private[spark] val env = SparkEnv.create( 
  2.     conf, 
  3.     "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true
  4.     isLocalisLocal = isLocal) 
  5.   SparkEnv.set(env) 

步驟2:創建TaskScheduler,根據Spark的運行模式來選擇相應的SchedulerBackend,同時啟動taskscheduler,這一步至為關鍵

  1. private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) 
  2.  taskScheduler.start() 

TaskScheduler.start目的是啟動相應的SchedulerBackend,并啟動定時器進行檢測

  1. override def start() { 
  2.     backend.start() 
  3.  
  4.     if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher 
  5.       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, 
  6.             SPECULATION_INTERVAL milliseconds) { 
  7.         checkSpeculatableTasks() 
  8.       } 
  9.     } 
  10.   } 

步驟3:以上一步中創建的TaskScheduler實例為入參創建DAGScheduler并啟動運行

  1. @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) 
  2.   dagScheduler.start() 

步驟4:啟動WEB UI

  1. ui.start() 

RDD的轉換過程

還是以最簡單的wordcount為例說明rdd的轉換過程

  1. sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) 

上述一行簡短的代碼其實發生了很復雜的RDD轉換,下面仔細解釋每一步的轉換過程和轉換結果

步驟1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通過map操作生成MappedRDD,如果在spark-shell中執行上述語句,得到的結果可以證明所做的分析

  1. scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; 
  2. assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: 
  3. Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: 
  4. Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0:
  5.  org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13 

步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap將原來的MappedRDD轉換成為FlatMappedRDD

  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =   
  2. new FlatMappedRDD(this, sc.clean(f)) 

步驟3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相應的鍵值對,上一步的FlatMappedRDD被轉換成為MappedRDD

步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最復雜

步驟2,3中使用到的operation全部定義在RDD.scala中,而這里使用到的reduceByKey卻在RDD.scala中見不到蹤跡。reduceByKey的定義出現在源文件PairRDDFunctions.scala

細心的你一定會問reduceByKey不是MappedRDD的屬性和方法啊,怎么能被MappedRDD調用呢?其實這背后發生了一個隱式的轉換,該轉換將MappedRDD轉換成為PairRDDFunctions

  1. implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = 
  2.     new PairRDDFunctions(rdd) 

這種隱式的轉換是scala的一個語法特征,如果想知道的更多,請用關鍵字"scala implicit method"進行查詢,會有不少的文章對此進行詳盡的介紹。

接下來再看一看reduceByKey的定義

  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { 
  2.   reduceByKey(defaultPartitioner(self), func) 
  3.  
  4. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { 
  5.   combineByKey[V]((v: V) => v, func, func, partitioner) 
  6.  
  7. def combineByKey[C](createCombiner: V => C, 
  8.     mergeValue: (C, V) => C, 
  9.     mergeCombiners: (C, C) => C, 
  10.     partitioner: Partitioner, 
  11.     mapSideCombine: Boolean = true
  12.     serializerClass: String = null): RDD[(K, C)] = { 
  13.   if (getKeyClass().isArray) { 
  14.     if (mapSideCombine) { 
  15.       throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner])
  16.  { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
  17.  if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => 
  18. { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } 
  19. else if (mapSideCombine) 
  20. { val combined = self.mapPartitionsWithContext((context, iter) => 
  21. { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) =>
  22.  { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) }
  23.  else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) =>
  24.  { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true
  25.   } 

reduceByKey最終會調用combineByKey, 在這個函數中PairedRDDFunctions會被轉換成為ShuffleRDD,當調用mapPartitionsWithContext之后,shuffleRDD被轉換成為MapPartitionsRDD

Log輸出能證明我們的分析

  1. res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13 

RDD轉換小結

小結一下整個RDD轉換過程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整個轉換過程好長啊,這一切的轉換都發生在任務提交之前。

運行過程分析

數據集操作分類

在對任務運行過程中的函數調用關系進行分析之前,我們也來探討一個偏理論的東西,作用于RDD之上的Transformantion為什么會是這個樣子?

對這個問題的解答和數學搭上關系了,從理論抽象的角度來說,任務處理都可歸結為“input->processing->output"。input和output對應于數據集dataset.

在此基礎上作一下簡單的分類

  1. one-one 一個dataset在轉換之后還是一個dataset,而且dataset的size不變,如map

  2. one-one 一個dataset在轉換之后還是一個dataset,但size發生更改,這種更改有兩種可能:擴大或縮小,如flatMap是size增大的操作,而subtract是size變小的操作

  3. many-one 多個dataset合并為一個dataset,如combine, join

  4. one-many 一個dataset分裂為多個dataset, 如groupBy

Task運行期的函數調用

task的提交過程參考本系列中的第二篇文章。本節主要講解當task在運行期間是如何一步步調用到作用于RDD上的各個operation

TaskRunner.run

Task.run

Task.runTask (Task是一個基類,有兩個子類,分別為ShuffleMapTask和ResultTask)

RDD.iterator

RDD.computeOrReadCheckpoint

RDD.compute 

或許當看到RDD.compute函數定義時,還是覺著f沒有被調用,以MappedRDD的compute定義為例

  1. override def compute(split: Partition, context: TaskContext) =                                                                                                       
  2.   firstParent[T].iterator(split, context).map(f)   

注意,這里最容易產生錯覺的地方就是map函數,這里的map不是RDD中的map,而是scala中定義的iterator的成員函數map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆棧輸出

  1. 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 
  2. 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 
  3. 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 
  4. 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 
  5. 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  6. 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  7. 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  8. 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  9. 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  10. 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
  11. 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  12. 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  13. 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  14. 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  15. 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  16. 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 
  17. 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  18. 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  19. 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 
  20. 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 
  21. 100         at org.apache.spark.scheduler.Task.run(Task.scala:53) 
  22. 101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) 

ResultTask

compute的計算過程對于ShuffleMapTask比較復雜,繞的圈圈比較多,對于ResultTask就直接許多。

  1. override def runTask(context: TaskContext): U = { 
  2.     metrics = Some(context.taskMetrics) 
  3.     try { 
  4.       func(context, rdd.iterator(split, context)) 
  5.     } finally { 
  6.       context.executeOnCompleteCallbacks() 
  7.     } 
  8.   }  

 計算結果的傳遞

上面的分析知道,wordcount這個job在最終提交之后,被DAGScheduler分為兩個stage,***個Stage是shuffleMapTask,第二個Stage是ResultTask.

那么ShuffleMapTask的計算結果是如何被ResultTask取得的呢?這個過程簡述如下

  1. ShffuleMapTask將計算的狀態(注意不是具體的數據)包裝為MapStatus返回給DAGScheduler

  2. DAGScheduler將MapStatus保存到MapOutputTrackerMaster中

  3. ResultTask在執行到ShuffleRDD時會調用BlockStoreShuffleFetcher的fetch方法去獲取數據

    1. ***件事就是咨詢MapOutputTrackerMaster所要取的數據的location

    2. 根據返回的結果調用BlockManager.getMultiple獲取真正的數據

BlockStoreShuffleFetcher的fetch函數偽碼

  1. val blockManager = SparkEnv.get.blockManager 
  2.  
  3. val startTime = System.currentTimeMillis 
  4. val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) 
  5. logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) 
  6. val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)  

注意上述代碼中的getServerStatusesgetMultiple,一個是詢問數據的位置,一個是去獲取真正的數據。

有關Shuffle的詳細解釋,請參考”詳細探究Spark的shuffle實現一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

原文鏈接:http://www.cnblogs.com/hseagle/p/3673132.html

 

責任編輯:彭凡 來源: 博客園
相關推薦

2014-07-04 10:58:47

Apache Spar

2014-07-03 15:40:09

Apache Spar

2014-07-15 10:59:58

Spark代碼跟讀

2022-08-27 08:02:09

SQL函數語法

2021-08-09 09:00:00

Kubernetes云計算架構

2021-08-30 18:09:57

鴻蒙HarmonyOS應用

2012-02-22 22:56:19

開源Apache

2011-03-21 10:49:33

LAMPApache

2014-02-14 15:43:16

ApacheSpark

2010-06-04 09:11:10

.NET并行編程

2015-03-31 18:26:43

陌陌社交

2010-09-16 09:35:17

SQL函數

2021-04-15 08:45:25

Zabbix 5.2Apache監控

2011-03-21 11:33:09

LAMPApache

2022-05-19 15:08:43

技術函數調用棧Linux

2011-06-23 14:27:48

QT QLibrary 動態庫

2010-06-08 08:41:08

.NET 4并行編程

2010-06-07 08:43:46

.NET 4并行編程

2024-03-14 08:17:33

JVMJava對象

2017-08-04 10:58:55

RDDSpark算子
點贊
收藏

51CTO技術棧公眾號

一区在线观看视频| 日本va欧美va瓶| 亚洲精品一线二线三线| 黄页网站大全在线观看| 精品美女视频在线观看免费软件| 青青青爽久久午夜综合久久午夜| 久久视频在线看| 亚洲色偷偷色噜噜狠狠99网| 国产日韩另类视频一区| 国产精品乱人伦中文| 国产伦精品一区二区三区照片 | 懂色av中文在线| 国内成人精品2018免费看| 97视频在线免费观看| 亚洲AV成人无码网站天堂久久| 视频国产精品| 欧洲一区在线观看| 久久人人爽人人爽人人av| 国产高清免费av在线| 粉嫩av一区二区三区粉嫩| 国产精品国产福利国产秒拍| 精品少妇theporn| 国产在线观看91一区二区三区| 欧美一区二区三区的| jizz欧美激情18| 国产ktv在线视频| 亚洲欧美在线高清| 三区精品视频| 日中文字幕在线| 国产成人免费xxxxxxxx| 国产精品视频26uuu| 91美女免费看| 在线成人www免费观看视频| 日韩最新中文字幕电影免费看| 日韩av手机在线播放| 国产精品视频一区二区三区| 欧美性高潮在线| 你真棒插曲来救救我在线观看| 麻豆网站在线看| 欧美国产日韩亚洲一区| 久久亚洲综合网| 亚洲免费一级片| 国产精品99久久久久久似苏梦涵| 国产精品一区二区三区毛片淫片 | 91香蕉视频在线播放| 国产永久精品大片wwwapp| 日韩精品在线观看视频| 黄色av网址在线观看| 国产精品色呦| 精品少妇一区二区三区日产乱码| 手机精品视频在线| 精品国产乱码一区二区三区| 7777精品伊人久久久大香线蕉超级流畅 | 国产亚洲精品自在久久| 亚洲AV无码精品色毛片浪潮| 国产米奇在线777精品观看| 国产精品综合久久久| 中国精品一区二区| 免费观看久久久4p| 国产精品偷伦视频免费观看国产| 国产精品久久久久久久久夜色| 美女视频一区免费观看| 欧洲亚洲女同hd| 精品一区二区无码| 免费人成在线不卡| 成人国产精品一区二区| 国产v片在线观看| 国产精品18久久久久久久久| 国产91视觉| 日韩一级片免费| 91看片淫黄大片一级| 欧美一二三区| 色哟哟免费在线观看 | 亚洲天堂男人天堂| 少妇太紧太爽又黄又硬又爽小说| 久久看人人摘| 欧美成人在线免费视频| 国产精品23p| 亚洲女同在线| 国产日韩欧美日韩| 性做久久久久久久| 久久亚洲私人国产精品va媚药| 日韩精品伦理第一区| 求av网址在线观看| 亚洲国产欧美另类丝袜| 韩国日本在线视频| 亚洲欧美在线人成swag| 精品国产乱码久久久久久夜甘婷婷| 日韩综合第一页| 精品在线观看入口| 精品国模在线视频| 日韩精品一区二区三| 久久黄色网页| 91在线精品播放| 午夜国产在线观看| 国产精品大尺度| 亚洲熟妇无码另类久久久| 性感美女一区二区在线观看| 6080国产精品一区二区| 天天躁日日躁狠狠躁av麻豆男男 | 国产亚洲精彩久久| 3d动漫精品啪啪1区2区免费| 国产免费一区二区三区最新6| 少妇精品久久久一区二区| 久热精品视频在线| 国产情侣小视频| 成人av第一页| 亚洲一二三区在线| 美女露胸视频在线观看| 91精品午夜视频| 日韩av在线看免费观看| 亚洲一区 二区 三区| 日本人成精品视频在线| 精品久久久久久亚洲综合网站 | 国产精品久久久免费看| 一本色道久久综合| 91视频99| 午夜视频成人| 日本二三区不卡| 在线看黄色的网站| 欧美1区2区视频| 国产区精品在线观看| 欧美性孕妇孕交| 亚洲国产精品久久不卡毛片| 国产亚洲视频一区| 欧美熟乱15p| 日韩美女在线播放| 亚洲色偷精品一区二区三区| 夜色激情一区二区| 91aaa精品| 水蜜桃精品av一区二区| 日韩av男人的天堂| 五月婷婷开心中文字幕| 一区二区三区 在线观看视频| 国产又黄又猛又粗| 精品freesex老太交| 欧美中在线观看| 四虎永久在线观看| 亚洲国产成人高清精品| 26uuu国产| 欧美精品1区| 91丝袜美腿美女视频网站| 在线观看二区| 欧美日韩综合视频| 九色porny自拍视频| 欧美综合国产| 日韩.欧美.亚洲| 国产精品久久久久av电视剧| 精品亚洲一区二区三区| 69国产精品视频免费观看| av在线不卡免费看| 免费看国产曰批40分钟| 亚洲精品456| 日韩av电影在线播放| 国产精品久久一区二区三区不卡| 色拍拍在线精品视频8848| 瑟瑟视频在线观看| 日本欧美一区二区三区乱码| 亚洲黄色一区二区三区| 久久精品国产精品亚洲毛片| 俺也去精品视频在线观看| 国产视频手机在线观看| 亚洲五码中文字幕| 日韩网站在线播放| 日韩成人一区二区| 在线视频福利一区| 精品国产三级| 久久乐国产精品| 天堂av网在线| 欧美在线一区二区| 天天天天天天天天操| 国产九色精品成人porny| 9色porny| 色棕色天天综合网| 亚洲aⅴ男人的天堂在线观看| 免费影视亚洲| 亚洲欧美日韩精品久久亚洲区| 黄色污污网站在线观看| 18欧美亚洲精品| 国产午夜在线一区二区三区| 久久精品男女| 一级一片免费播放| 久久亚洲道色| 国产精品成人观看视频国产奇米| 好吊日视频在线观看| 精品国产91久久久久久久妲己| 性无码专区无码| 亚洲欧洲另类国产综合| 超碰男人的天堂| 老司机精品视频导航| 日韩欧美不卡在线| 三上亚洲一区二区| 国产一区二区免费在线观看| 草莓视频成人appios| 欧美精品久久久久久久免费观看| 免费av在线电影| 欧美成人video| 国产免费a视频| 亚洲一区二区三区小说| 亚洲精品91在线| 国产99久久久国产精品| 久久婷婷国产91天堂综合精品| 欧美日韩日本国产亚洲在线| 日本一区二区视频| 国产一区二区三区亚洲| 国产日韩专区在线| 亚洲人成午夜免电影费观看| 免费91麻豆精品国产自产在线观看| 日本在线视频1区| 欧美成人精精品一区二区频| 亚洲天堂中文字幕在线| 精品国产乱码久久久久久天美| 亚洲AV成人无码精电影在线| 久久久精品日韩欧美| 在线看黄色的网站| 国产在线播放一区| 亚洲福利精品视频| 久久国产精品久久久久久电车| 欧美美女黄色网| 久久精品播放| 日韩国产美国| 亚洲视频分类| 精品高清视频| 成人三级毛片| 51成人做爰www免费看网站| 久久精品超碰| 成人av在线天堂| 成人自拍视频网| 国产99久久久欧美黑人| 最新欧美色图| 欧美一级大片在线观看| 草草在线观看| 91精品国产91久久| av中文字幕在线看| 久久久久亚洲精品成人网小说| 国产黄色在线网站| 久久久av免费| 国产不卡在线| 欧美成人在线免费视频| 欧美人与牲禽动交com | 亚洲精品按摩视频| 丰满肥臀噗嗤啊x99av| 精品久久人人做人人爰| www.五月婷婷| 精品国产一区二区三区四区四| japanese国产| 精品国产乱子伦一区| 欧美 日韩 中文字幕| 精品国产伦一区二区三区观看体验| 亚洲乱熟女一区二区| 亚洲精品99久久久久中文字幕| 好吊色视频一区二区| 精品国产sm最大网站| 天堂在线一二区| 一本色道久久88综合亚洲精品ⅰ| yjizz视频网站在线播放| 丝袜亚洲另类欧美重口| 巨大荫蒂视频欧美另类大| 久久亚洲综合国产精品99麻豆精品福利| 秋霞午夜在线观看| 欧美精品午夜视频| 91在线三级| 日韩av电影手机在线| 欧美黄色a视频| 91精品天堂| 国产精品一区二区中文字幕| 久久波多野结衣| 精品久久美女| 欧美交换配乱吟粗大25p| 在线观看亚洲| 99热手机在线| 国产麻豆精品一区二区| 人妻av一区二区| 国产日韩综合av| 黄色一级片中国| 欧美日韩激情网| 亚洲永久精品视频| 精品国产精品网麻豆系列| 你懂的免费在线观看| 色阁综合伊人av| 懂色av一区| 国产精品mp4| 99热这里只有精品首页| 欧美一区三区二区在线观看| 国产精品成人一区二区不卡| 免费人成在线观看视频播放| 久久中文字幕一区二区三区| 欧美一级小视频| 91丨porny丨户外露出| 亚欧精品视频一区二区三区| 亚洲精品v日韩精品| 无码日韩精品一区二区| 日韩欧美一区二区视频| 久色视频在线| 久久久久久美女| 精品日本视频| 精品国产一区二区三区四区vr| 日韩精品欧美| 两根大肉大捧一进一出好爽视频| 麻豆精品久久精品色综合| 7788色淫网站小说| 亚洲日穴在线视频| 亚洲精品无码久久久久| 亚洲第一国产精品| 日本最黄一级片免费在线| 91精品国产91久久久久| 看亚洲a级一级毛片| 日本一区视频在线观看免费| 黄色国产精品| 国产传媒免费观看| 国产欧美精品一区| 国产精品第9页| 日韩久久久精品| 色网站免费在线观看| 日本精品免费观看| 成人高潮a毛片免费观看网站| 日韩欧美一区二区在线观看| 韩日在线一区| 91丝袜超薄交口足| 国产精品高潮呻吟久久| 亚洲无码精品一区二区三区| 亚洲国产又黄又爽女人高潮的| caoporm免费视频在线| 国产精品一区二区3区| 久草成人在线| 91视频 -- 69xx| 成人动漫一区二区| 久草视频免费在线| 日韩一区二区视频在线观看| 日本蜜桃在线观看| 国产欧美日韩专区发布| 欧美日韩有码| 亚洲欧美另类动漫| 国产亚洲午夜高清国产拍精品| 国产精品免费av一区二区| 精品久久久久香蕉网| 污片视频在线免费观看| 51国偷自产一区二区三区的来源 | 免费观看美女裸体网站| 懂色av中文字幕一区二区三区| a在线视频播放观看免费观看| 欧美精品日韩一本| 男人资源在线播放| 成人a在线视频| 婷婷综合视频| 亚洲国产综合av| 一区二区三区四区在线免费观看| 国产毛片毛片毛片毛片| 久久av资源网站| 日韩精品一级| 又大又硬又爽免费视频| 成人av一区二区三区| 青青草成人av| 亚洲色图美腿丝袜| 成人精品动漫| www.-级毛片线天内射视视| 韩国一区二区三区| 久久免费视频6| 亚洲精品福利资源站| 中文字幕高清在线播放| 日韩成人av电影在线| 蜜桃视频一区二区三区| 久久国产精品国语对白| 日韩欧美久久久| h片在线观看视频免费免费| 久久国产精品久久精品国产| 首页国产欧美日韩丝袜| 日韩欧美视频免费观看| 日韩一区二区免费在线电影| heyzo中文字幕在线| 欧美日韩精品久久| 久久国产麻豆精品| 青青草成人免费| 亚洲激情久久久| 91国拍精品国产粉嫩亚洲一区 | 日本精品一区二区三区在线播放| 成人免费激情视频| 一区在线视频| 久久午夜福利电影| 91精品视频网| 92国产精品| 日本精品免费视频| www国产精品av| 国产剧情久久久| 51午夜精品视频| 国产精品99久久精品| 你懂得在线视频| 9191国产精品| 在线观看网站免费入口在线观看国内| 亚洲国产激情一区二区三区| 国产成人福利片| 夜夜躁日日躁狠狠久久av| 欧美理论片在线观看| 精品久久视频| 亚洲成av人片在线观看无| 欧美日韩国产一区| 女人让男人操自己视频在线观看| 在线视频不卡一区二区| 久久精品人人爽人人爽| 亚洲欧美激情另类|