精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Master 分配資源并在 Worker上啟動 Executor ,逐行代碼注釋版

開發(fā) 前端
這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

[[432016]]

本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)」,作者wangkai。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)公眾號。

一、回顧一下之前的內(nèi)容

上一次閱讀到了 SparkContext 初始化,繼續(xù)往下之前,先溫故一下之前的內(nèi)容。

這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

首先需要啟動 Spark 集群,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。

啟動好之后,開始提交作業(yè),使用 spark-submit 命令來提交。

  • 首先在提交任務(wù)的機(jī)器上使用 java 命令啟動了一個虛擬機(jī),并且執(zhí)行了主類 SparkSubmit 的 main 方法作為入口。
  • 然后根據(jù)提交到不同的集群,來 new 不同的客戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然后把 java DriverWrapper 這個命令封裝到 RequestSubmmitDriver 消息中,把這個消息發(fā)送給 Master;
  • Master 隨機(jī)找一個滿足資源條件的 Worker 來啟動 Driver,實(shí)際上是在虛擬機(jī)里執(zhí)行 DriverWrapper 的 main 方法;
  • 然后 Worker 開始啟動 Driver,啟動的時候會執(zhí)行用戶提交的 java 包里的 main 方法,然后開始執(zhí)行 SparkContext 的初始化,依次在 Driver 中創(chuàng)建了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的實(shí)例。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。

二、Master 處理應(yīng)用的注冊

接著上次 ClientEndpoint 啟動之后,會向 Master 發(fā)送一個 RegisterApplication 消息,Master 開始處理這個消息。

然后看到 Matster 類處理 RegisterApplication 消息的地方:

可以看到,用應(yīng)用程序的描述和 Driver 的引用創(chuàng)建了一個 Application,然后開始注冊這個 Application。

注冊 Application 很簡單,就是往 Master 的內(nèi)存中加入各種信息,重點(diǎn)來了,把 ApplicationInfo 加入到了 waitingApps 這個結(jié)構(gòu)里,然后 schedule() 方法會遍歷這個列表,為 Application 分配資源,并調(diào)度起來。

然后往 zk 中寫入了 Application 的信息,并且往 Driver 發(fā)送了一個 RegisteredApplication 應(yīng)用已經(jīng)注冊的消息。

接著開始 schedule(),這個方法上次講過,它會遍歷兩個列表,一個是遍歷 waitingDrivers 來啟動 Driver,一個是遍歷 waitingApps,來啟動 Application。

waitingDrivers 列表在客戶端請求啟動 Driver 的時候就處理過了,本次重點(diǎn)看這個方法:

  1. startExecutorsOnWorkers() 

三、Master 對資源的調(diào)度

有以下幾個步驟:

  • 遍歷 waitingApps 的所有 app;
  • 如果 app 需要的核數(shù)小于一個 Executor 可以提供的核數(shù),就不為 app 分配新的 Executor;
  • 過濾出還有可供調(diào)度的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
  • 計(jì)算所有 usableWorkers 上要分配多少 CPU;
  • 然后遍歷可用的 Workers,分配資源并執(zhí)行調(diào)度,啟動 Executor。

源碼從 Master 類的 schedule() 方法的最后一行 startExecutorsOnWorkers() 開始:

這個方法主要作用是計(jì)算 worker 的 executor 數(shù)量和分配的資源并啟動 executor。

  1. /** 
  2.  * Schedule and launch executors on workers 
  3.  */ 
  4. private def startExecutorsOnWorkers(): Unit = { 
  5.     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 
  6.     // in the queue, then the second app, etc. 
  7.  
  8.     for (app <- waitingApps) { 
  9.         val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) 
  10.         // If the cores left is less than the coresPerExecutor,the cores left will not be allocated 
  11.         if (app.coresLeft >= coresPerExecutor) { 
  12.             // 1. 剩余內(nèi)存大于單個 executor 需要的內(nèi)存 
  13.             // 2. 剩余的內(nèi)核數(shù)大于單個 executor 需要的內(nèi)核數(shù) 
  14.             // 3. 按照內(nèi)核數(shù)從大到小排序 
  15.             // Filter out workers that don't have enough resources to launch an executor 
  16.             val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 
  17.                 .filter(canLaunchExecutor(_, app.desc)) 
  18.                 .sortBy(_.coresFree).reverse 
  19.             val appMayHang = waitingApps.length == 1 && 
  20.                 waitingApps.head.executors.isEmpty && usableWorkers.isEmpty 
  21.             if (appMayHang) { 
  22.                 logWarning(s"App ${app.id} requires more resource than any of Workers could have."
  23.             } 
  24.             // 計(jì)算每個 Worker 上可用的 cores 
  25.             val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 
  26.              
  27.             // Now that we've decided how many cores to allocate on each worker, let's allocate them 
  28.             for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 
  29.                 allocateWorkerResourceToExecutors( 
  30.                     app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) 
  31.             } 
  32.         } 
  33.     } 

(1)遍歷 waitingApps,如果 app 還需要的 cpu 核數(shù)大于每個執(zhí)行器的核數(shù),才繼續(xù)分配。

(2)過濾可用的 worker,條件一:該 worker 剩余內(nèi)存大于單個 executor 需要的內(nèi)存;條件二:該 worker 剩余 cpu 核數(shù)大于單個 executor 需要的核數(shù);然后按照可用 cpu核數(shù)從大到小排序。

(3)下面兩個方法是關(guān)鍵的方法

scheduleExecutorsOnWorkers(),用來計(jì)算每個 Worker 上可用的 cpu 核數(shù);

allocateWorkerResourceToExecutors() 用來真正在 Worker 上分配 Executor。

四、scheduleExecutorsOnWorkers 計(jì)算每個 Worker 可用的核數(shù)

這個方法很長,首先看方法注釋,大致翻譯了一下:

當(dāng)執(zhí)行器分配的 cpu 核數(shù)(spark.executor.cores)被顯示設(shè)置的時候,如果這個 worker 上有足夠的核數(shù)和內(nèi)存的話,那么每個 worker 上可以執(zhí)行多個執(zhí)行器;反之,沒有設(shè)置的時候,每個 worker 上只能啟動一個執(zhí)行器;并且,這個執(zhí)行器會使用 worker 能提供出來的盡可能多的核數(shù);

appA 和 appB 都有一個執(zhí)行器運(yùn)行在 worker1 上。但是 appA 還需要一些 cpu 核,當(dāng) appB 執(zhí)行結(jié)束,釋放了它在 worker1 上的核數(shù)時, 下一次調(diào)度的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上啟動了多個執(zhí)行器。

設(shè)置 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:集群有4個worker,每個worker有16核;用戶請求 3 個執(zhí)行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設(shè)置這個參數(shù),那么每次分配 1 個 cpu核心,每個 worker 輪流分配一個 cpu核,最終 4 個執(zhí)行器分配 12 個核心給每個 executor,4 個 worker 也同樣分配了48個核心,但是最終每個 executor 只有 12核 < 16 核,所以最終沒有執(zhí)行器被啟動。

如果看我的翻譯還是很費(fèi)勁,我就再精簡下:

  • 如果沒有設(shè)置 spark.executor.cores,那么每個 Worker 只能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數(shù);
  • 如果顯示設(shè)置了,那么每個 Worker 可以啟動多個 Executor;

下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再分配 Executor 了。

重點(diǎn)是中間方法后面那一段,遍歷每個 Worker 分配 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直分配,直到 Worker 資源分配完畢。

  1. private def scheduleExecutorsOnWorkers( 
  2.     app: ApplicationInfo, 
  3.     usableWorkers: Array[WorkerInfo], 
  4.     spreadOutApps: Boolean): Array[Int] = { 
  5.     // 每個 executor 的核數(shù) 
  6.     val coresPerExecutor = app.desc.coresPerExecutor 
  7.     // 每個 executor 的最小核數(shù) 為1 
  8.     val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 
  9.     //  每個Worker分配一個Executor? 這個參數(shù)可以控制這個行為 
  10.     val oneExecutorPerWorker = coresPerExecutor.isEmpty 
  11.     //  每個Executor的內(nèi)存 
  12.     val memoryPerExecutor = app.desc.memoryPerExecutorMB 
  13.     val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor 
  14.     // 可用 Worker 的總數(shù) 
  15.      
  16.     val numUsable = usableWorkers.length 
  17.     // 給每個Worker的cores數(shù) 
  18.     val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker 
  19.     // 給每個Worker上新的Executor數(shù) 
  20.     val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker 
  21.     // app 需要的核心數(shù) 和 所有 worker 能提供的核心總數(shù),取最小值 
  22.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum
  23.  
  24.     //  判斷指定的worker是否可以為這個app啟動一個executor 
  25.     /** Return whether the specified worker can launch an executor for this app. */ 
  26.     def canLaunchExecutorForApp(pos: Int): Boolean = { 
  27.         // 如果能提供的核心數(shù) 大于等 executor 需要的最小核心數(shù),則繼續(xù)分配 
  28.         val keepScheduling = coresToAssign >= minCoresPerExecutor 
  29.         // 是否有足夠的核心:當(dāng)前 worker 能提供的核數(shù) 減去 每個 worker 已分配的核心數(shù) ,大于每個 executor最小的核心數(shù) 
  30.         val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 
  31.         // 當(dāng)前 worker 新分配的 executor 個數(shù) 
  32.         val assignedExecutorNum = assignedExecutors(pos) 
  33.  
  34.         //  如果每個worker允許多個executor,就能一直在啟動新的的executor 
  35.         //  如果在這個worker上已經(jīng)有executor,則給這個executor更多的core 
  36.         // If we allow multiple executors per worker, then we can always launch new executors. 
  37.         // Otherwise, if there is already an executor on this worker, just give it more cores. 
  38.  
  39.         // 如果一個 worker 上可以啟動多個 executor  或者 這個 worker 還沒分配 executor 
  40.         val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 
  41.         if (launchingNewExecutor) { 
  42.             // 總共已經(jīng)分配的內(nèi)存 
  43.             val assignedMemory = assignedExecutorNum * memoryPerExecutor 
  44.             // 是否有足夠的內(nèi)存:當(dāng)前worker 的剩余內(nèi)存 減去 已分配的內(nèi)存 大于每個 executor需要的內(nèi)存 
  45.             val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor 
  46.             // 
  47.             val assignedResources = resourceReqsPerExecutor.map { 
  48.                 req => req.resourceName -> req.amount * assignedExecutorNum 
  49.             }.toMap 
  50.             val resourcesFree = usableWorkers(pos).resourcesAmountFree.map { 
  51.                 case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) 
  52.             } 
  53.             val enoughResources = ResourceUtils.resourcesMeetRequirements( 
  54.                 resourcesFree, resourceReqsPerExecutor) 
  55.             // 所有已分配的核數(shù)+app需要的核數(shù)  小于 app的核數(shù)限制 
  56.             val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit 
  57.             keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit 
  58.         } else { 
  59.             // We're adding cores to an existing executor, so no need 
  60.             // to check memory and executor limits 
  61.             keepScheduling && enoughCores 
  62.         } 
  63.     } 
  64.  
  65.     // 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達(dá)到了這個Application的要求 
  66.     // Keep launching executors until no more workers can accommodate any 
  67.     // more executors, or if we have reached this application's limits 
  68.     // 過濾出可以啟動 executor 的 workers 
  69.     var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) 
  70.  
  71.     while (freeWorkers.nonEmpty) { 
  72.         // 遍歷每個 worker 
  73.         freeWorkers.foreach { pos => 
  74.             var keepScheduling = true 
  75.             while (keepScheduling && canLaunchExecutorForApp(pos)) { 
  76.                 coresToAssign -= minCoresPerExecutor 
  77.                 assignedCores(pos) += minCoresPerExecutor 
  78.  
  79.                 //  如果我們在每個worker上啟動一個executor,每次迭代為每個executor增加一個core 
  80.                 //  否則,每次迭代都會為新的executor分配cores 
  81.                 // If we are launching one executor per worker, then every iteration assigns 1 core 
  82.                 // to the executor. Otherwise, every iteration assigns cores to a new executor. 
  83.                 if (oneExecutorPerWorker) { 
  84.                     assignedExecutors(pos) = 1 
  85.                 } else { 
  86.                     assignedExecutors(pos) += 1 
  87.                 } 
  88.  
  89.                 //  如果不使用Spreading out方法,我們會在這個worker上繼續(xù)調(diào)度executor,直到使用它所有的資源 
  90.                 //  否則,就跳轉(zhuǎn)到下一個worker 
  91.                 // Spreading out an application means spreading out its executors across as 
  92.                 // many workers as possible. If we are not spreading outthen we should keep 
  93.                 // scheduling executors on this worker until we use all of its resources. 
  94.                 // Otherwise, just move on to the next worker. 
  95.                 if (spreadOutApps) { 
  96.                     keepScheduling = false 
  97.                 } 
  98.             } 
  99.         } 
  100.         freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) 
  101.     } 
  102.     assignedCores 

接著真正開始在 Worker 上啟動 Executor:

在 launchExecutor 在方法里:

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 
  2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 
  3.     worker.addExecutor(exec
  4.     worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, 
  5.         exec.application.descexec.cores, exec.memory, exec.resources)) 
  6.     exec.application.driver.send( 
  7.         ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 

給 Worker 發(fā)送了一個 LaunchExecutor 消息。

然后給執(zhí)行器對應(yīng)的 Driver 發(fā)送了 ExecutorAdded 消息。

五、總結(jié)

本次我們講了 Master 處理應(yīng)用的注冊,重點(diǎn)是把 app 信息加入到 waitingApps 列表中,然后調(diào)用 schedule() 方法,計(jì)算每個 Worker 可用的 cpu核數(shù),并且在 Worker 上啟動執(zhí)行器。

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)
相關(guān)推薦

2011-04-19 13:32:52

2009-12-24 11:04:59

固定分配資源動態(tài)分配資源

2015-04-17 10:28:02

無線頻譜移動通信頻譜

2021-08-31 23:09:27

Spark資源分配

2012-06-05 08:59:35

Hadoop架構(gòu)服務(wù)器

2012-03-09 17:38:17

ibmdw

2014-12-26 10:58:35

托管云托管私有云公共云

2010-04-07 15:55:17

無線接入頻段

2022-12-12 08:42:06

Java對象棧內(nèi)存

2022-06-06 12:02:23

代碼注釋語言

2013-04-17 15:10:07

銳捷寬帶寬帶網(wǎng)絡(luò)

2011-04-19 13:48:55

vCloud Dire

2024-10-09 14:25:21

2013-05-21 09:08:24

服務(wù)器虛擬化網(wǎng)卡

2019-12-20 08:50:21

LinuxKsnip截圖

2021-06-22 16:40:32

鴻蒙HarmonyOS應(yīng)用

2022-04-19 07:47:13

數(shù)據(jù)中心末端資源分配

2016-03-21 18:56:54

物聯(lián)網(wǎng)IoTIT基礎(chǔ)架構(gòu)

2011-01-26 11:01:37

虛擬機(jī)負(fù)載管理資源分配

2023-10-24 07:25:10

容器資源云分級
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

久久久久九九九九| 91精品国产一区二区三区香蕉| 久久久久久久久一区二区| 99久久精品国产亚洲| 欧美日韩水蜜桃| 精品视频免费看| 久久久久久久香蕉| 涩涩视频免费看| 免费久久精品视频| 欧美第一黄色网| 国产小视频自拍| 欧洲大片精品免费永久看nba| 亚洲第一成年网| 亚洲精品久久区二区三区蜜桃臀| 亚洲国产成人一区二区| 日韩专区一卡二卡| 久久精品国产亚洲| 国产特黄级aaaaa片免| www.91精品| 色悠久久久久综合欧美99| 女同性恋一区二区| 国产在线观看免费网站| 国产成人综合在线观看| 国产成人精品视| 欧美黄色一级网站| 日本一二区不卡| 亚洲国产精品高清久久久| 爱爱爱爱免费视频| 欧美一级大黄| 亚洲va欧美va人人爽| 中文字幕av日韩精品| 久久电影中文字幕| eeuss鲁片一区二区三区在线观看 eeuss影院一区二区三区 | 中文字幕欧美一| 美女主播视频一区| 成人午夜免费福利| 精品综合免费视频观看| 日本韩国在线不卡| 亚洲精品国产精品乱码| 欧美日韩1区| 日韩专区在线观看| 丰满的亚洲女人毛茸茸| 美女久久久久| 亚洲福利在线看| 在线观看一区二区三区视频| www.久久久.com| 欧美人妖巨大在线| 网站一区二区三区| 天天综合网站| 色网站国产精品| 日本三级免费观看| 成人一级福利| 五月天久久比比资源色| 欧美视频在线观看视频| 欧美hdxxx| 一区二区三区不卡视频| 国产制服91一区二区三区制服| 日本高清视频在线播放| 国产亚洲一二三区| 少妇免费毛片久久久久久久久| 免费在线观看一级毛片| 久久久夜色精品亚洲| 久久天堂国产精品| 精品视频二区| 国产欧美1区2区3区| 日韩欧美一区二区视频在线播放| 娇妻被老王脔到高潮失禁视频| av成人资源网| 精品国产乱码久久久久久久久| 性高潮免费视频| 欧美电影在线观看完整版| 欧美精品一区二区三区四区| 捆绑裸体绳奴bdsm亚洲| 九九热hot精品视频在线播放| 亚洲激情小视频| 在线观看福利片| 人妻av一区二区三区| 草草在线视频| 一本大道久久a久久综合婷婷| 国产免费人做人爱午夜视频| 日本.亚洲电影| 欧美日韩国产片| 日日夜夜精品视频免费观看| 国产精品超碰| 亚洲天堂av高清| 日本不卡一二区| 国产精品videosex极品| 2018中文字幕一区二区三区| 人妻中文字幕一区二区三区| 韩国三级电影一区二区| 国产一区二区三区四区hd| 经典三级在线| 亚洲乱码国产乱码精品精的特点 | 国产呦小j女精品视频| 精品日韩毛片| 欧美国产日本高清在线| 69亚洲精品久久久蜜桃小说| 激情成人综合网| 国产一区二区高清视频| jizz亚洲| 亚洲成人精品一区二区| av网站在线不卡| 97一区二区国产好的精华液| 国产一区二区三区中文| 激情五月婷婷在线| 日本一区中文字幕| 国产欧美日韩在线播放| 色影视在线观看| 天天操天天干天天综合网| 日韩大片一区二区| 欧美高清视频看片在线观看| 精品国内自产拍在线观看| 国产成人在线视频观看| 国产一区二区视频在线| 免费中文日韩| heyzo高清中文字幕在线| 精品视频在线免费| 国产男女猛烈无遮挡a片漫画| 希岛爱理一区二区三区| 国产伦精品一区二区三区免费| 国内精品久久久久影院优| 在线观看中文字幕av| 97久久超碰精品国产| 超薄肉色丝袜足j调教99| 欧美va视频| 日韩精品免费看| 久草福利资源在线观看| 九一九一国产精品| 日韩欧美在线一区二区| 国产在线美女| 日韩精品一区二区三区老鸭窝| 91激情视频在线观看| 在线亚洲激情| 国产精品区免费视频| dy888亚洲精品一区二区三区| 欧洲精品一区二区| 国产福利短视频| 亚洲视频一二| 91精品久久久久久蜜桃| 免费av在线网址| 精品视频一区 二区 三区| 一区二区三区四区免费| 一本一道久久综合狠狠老精东影业| 91精品国产91久久久久青草| 麻豆网站在线观看| 69久久99精品久久久久婷婷 | 性一交一乱一色一视频麻豆| 亚洲视频你懂的| 国产不卡的av| 在线国产一区二区| 亚洲自拍小视频| 亚洲区欧洲区| 欧美xxxxxxxx| 日韩av在线播| 2019国产精品| 日本女优爱爱视频| 欧美精品色图| 国产日韩专区在线| 岛国成人毛片| 日韩美女天天操| 日产电影一区二区三区| 不卡在线视频中文字幕| 日韩中文字幕三区| 亚洲a级精品| 国产精品久久久久77777| 91精品专区| 正在播放一区二区| 欧美极品视频在线观看| 成人精品高清在线| 无码aⅴ精品一区二区三区浪潮| 亚洲精品进入| 国产精品视频一区二区高潮| 伊人久久大香线蕉综合四虎小说 | 日本久久免费| 亚洲片在线资源| 中文字幕乱伦视频| 国产精品久久久久7777按摩| 国产精欧美一区二区三区白种人| 欧美精品日本| 久久久神马电影| 全球最大av网站久久| 久久国产精品久久久久| 少妇高潮久久久| 91国偷自产一区二区使用方法| 无码人中文字幕| 国产91丝袜在线播放| 91传媒久久久| 66国产精品| 精品国产乱码久久久久久丨区2区| 三上悠亚一区二区| 久久中文精品视频| 日韩av成人| 欧美男同性恋视频网站| www.youjizz.com亚洲| 国产日韩欧美在线一区| 欧美高清精品一区二区| 销魂美女一区二区三区视频在线| 亚洲一区二区三区免费看| 国产专区精品| 欧美中文在线字幕| 18在线观看的| 一本色道久久88综合日韩精品| 精品国产伦一区二区三| 色av综合在线| 久久99久久98精品免观看软件 | 丁香花五月激情| 久久综合色8888| 99热这里只有精品2| 日韩在线一区二区三区| 日韩一级片免费视频| 日本一区二区在线看| 久久人人爽爽人人爽人人片av| 免费观看亚洲视频大全| 日本高清视频精品| 日本电影在线观看| 日韩中文字幕网址| 毛片在线能看| 欧美精品一区二区三区四区| 91精品视频免费在线观看| 日韩欧美国产成人| 免费一级黄色大片| 亚洲婷婷综合久久一本伊一区| 久久精品一区二区免费播放| 国产成人av电影在线| 手机看片一级片| 久久尤物视频| 波多野结衣之无限发射| 国产精品av久久久久久麻豆网| 亚洲图色在线| 国产剧情一区| 精品伦理一区二区三区| 日韩精品三级| 91综合免费在线| 日韩三区四区| 国产精品自产拍在线观看中文| 亚洲1234区| 日本一区二区三区在线播放 | 国产精品视频内| 成人免费无遮挡| 91精品国产乱码久久久久久久久| 麻豆蜜桃在线| 欧美激情亚洲精品| 天堂av中文在线| 九九久久久久99精品| 成年人黄视频在线观看| 久久亚洲影音av资源网| 麻豆av免费在线观看| www日韩欧美| 黄色在线播放网站| 美女av一区二区三区| v天堂福利视频在线观看| 美女久久久久久久| 伊人影院在线视频| 久久久久久一区二区三区| 色爱综合区网| 久久免费视频在线| 蜜桃av.网站在线观看| 1769国内精品视频在线播放| 成人勉费视频| 国产精品视频久久久久| 高清不卡一区| 国产高清一区视频| 欧美有码在线| 日本一区二区三区www| 成人午夜国产| 中文字幕一区二区三区在线乱码| 亚洲欧美综合久久久| 国产在线无码精品| 一级成人国产| 午夜免费一区二区| 国产一区亚洲一区| 精品伦一区二区三区| 91麻豆福利精品推荐| 天天操天天舔天天射| 亚洲嫩草精品久久| 日韩av电影网址| 在线观看不卡一区| 日本在线播放不卡| xxxx日韩| 欧美日本韩国一区二区三区| 日韩影院二区| 97碰在线视频| 久热精品视频| 毛片毛片毛片毛片毛| av一区二区三区| 国产黄色录像视频| 一区二区三区视频在线观看| 欧美一级片免费在线观看| 欧美色图免费看| www.日本在线观看| 亚洲欧美国产一区二区三区| 欧美三级黄网| 97在线免费观看视频| 久久69成人| 精品欧美一区二区精品久久| 日韩欧美综合| 僵尸世界大战2 在线播放| 全部av―极品视觉盛宴亚洲| 岛国大片在线免费观看| 国产日韩欧美精品电影三级在线| 国产精品三区在线观看| 欧美日韩综合视频| 国产欧美综合视频 | 一级黄色大片免费看| 久久久91精品国产一区二区精品| 欧美在线视频第一页| 色婷婷久久久久swag精品| а√天堂资源在线| 日韩中文字在线| 毛片在线网站| 999日本视频| 色爱综合网欧美| 国产免费黄色av| 国产一区999| 欧美a在线播放| 欧美日韩一区二区三区在线免费观看| av无码精品一区二区三区宅噜噜| 亚洲女人天堂网| 国产蜜臀一区二区打屁股调教| 国产在线视频2019最新视频| 色老板在线视频一区二区| 欧美一级中文字幕| 久久99国内精品| 天堂在线中文视频| 一本大道久久a久久精二百| 日韩一级片免费| 色综合久久久888| 99亚洲男女激情在线观看| 神马影院午夜我不卡影院| 国产精品视频久久一区| 视频免费在线观看| 亚洲一区av在线| 国产欧美日韩成人| 北条麻妃99精品青青久久| 福利精品一区| 日韩欧美精品一区二区| 久久精品九九| 成人免费看aa片| 狠狠做深爱婷婷久久综合一区| 亚洲奶汁xxxx哺乳期| 欧美老少配视频| 伊人久久噜噜噜躁狠狠躁| 成人性做爰片免费视频| 精品在线播放免费| 日韩精品123区| 6080国产精品一区二区| 超碰最新在线| 97伦理在线四区| 欧美另类亚洲| 国内精品免费视频| 午夜av一区二区三区| 天堂成人在线观看| 91高清免费视频| 最新亚洲精品| 99久久免费观看| 成人免费毛片高清视频| 亚洲日本韩国在线| 亚洲色图美腿丝袜| 国模私拍国内精品国内av| 夜夜爽99久久国产综合精品女不卡| 美女www一区二区| 国产中文av在线| 欧美一区二区三区白人| 国产www视频在线观看| 精品视频一区二区| 玖玖精品视频| 影音先锋男人看片资源| 91精品国产综合久久久久久| 七七久久电影网| 蜜桃传媒视频第一区入口在线看| 日韩黄色小视频| 国产少妇在线观看| 欧美在线三级电影| 香蕉视频在线看| 国产另类第一区| 日韩av一区二| 中文字幕在线观看成人| 亚洲成人激情视频| 成人影院网站| 社区色欧美激情 | 波多野结衣一二区| 亚洲午夜久久久久久久| 日韩专区视频| www.亚洲视频.com| 国产日韩精品一区二区三区 | 精品久久国产字幕高潮| 这里有精品可以观看| 一区二区在线观看网站| 成人av网站免费观看| 亚洲精品一区二区二区| 欧美激情18p| 精品国产中文字幕第一页| 国产亚洲色婷婷久久| 日韩欧美在线视频观看| caoporn97在线视频| 欧美另类高清视频在线| 国产精品亚洲成人| 无码人妻精品一区二区三区不卡| 精品中文字幕视频| 日韩av密桃|