精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

對(duì)Spark的那些【魔改】

大數(shù)據(jù) Spark
這兩年做 streamingpro 時(shí),不可避免的需要對(duì)Spark做大量的增強(qiáng)。就如同我之前吐槽的,Spark大量使用了new進(jìn)行對(duì)象的創(chuàng)建,導(dǎo)致里面的實(shí)現(xiàn)基本沒有辦法進(jìn)行替換。

前言

這兩年做 streamingpro 時(shí),不可避免的需要對(duì)Spark做大量的增強(qiáng)。就如同我之前吐槽的,Spark大量使用了new進(jìn)行對(duì)象的創(chuàng)建,導(dǎo)致里面的實(shí)現(xiàn)基本沒有辦法進(jìn)行替換。

對(duì)Spark的那些【魔改】

比如SparkEnv里有個(gè)屬性叫closureSerializer,是專門做任務(wù)的序列化反序列化的,當(dāng)然也負(fù)責(zé)對(duì)函數(shù)閉包的序列化反序列化。我們看看內(nèi)部是怎么實(shí)現(xiàn)的:

  1. val serializer = instantiateClassFromConf[Serializer]( 
  2.       "spark.serializer""org.apache.spark.serializer.JavaSerializer"
  3.     logDebug(s"Using serializer: ${serializer.getClass}"
  4.  
  5.     val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) 
  6.  
  7.     val closureSerializer = new JavaSerializer(conf) 
  8.  
  9. val envInstance = new SparkEnv( 
  10. ..... 
  11.  closureSerializer, .... 

這里直接new了一個(gè)JavaSerializer,并不能做配置。如果不改源碼,你沒有任何辦法可以替換掉掉這個(gè)實(shí)現(xiàn)。同理,如果我想替換掉Executor的實(shí)現(xiàn),基本也是不可能的。

今年有兩個(gè)大地方涉及到了對(duì)Spark的【魔改】,也就是不通過改源碼,使用原有發(fā)型包,通過添加新代碼的方式來對(duì)Spark進(jìn)行增強(qiáng)。

二層RPC的支持

我們知道,在Spark里,我們只能通過Task才能touch到Executor。現(xiàn)有的API你是沒辦法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加載一個(gè)資源文件,現(xiàn)在是沒辦法做到的。為了能夠?qū)xecutor進(jìn)行直接的操作,那就需要建立一個(gè)新的通訊層。那具體怎么做呢?

首先,在Driver端建立一個(gè)Backend,這個(gè)比較簡(jiǎn)單,

  1. class PSDriverBackend(sc: SparkContext) extends Logging { 
  2.  
  3.   val conf = sc.conf 
  4.   var psDriverRpcEndpointRef: RpcEndpointRef = null 
  5.  
  6.   def createRpcEnv = { 
  7.     val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER 
  8.     val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS) 
  9.     val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS) 
  10.     var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt 
  11.     val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) { 
  12.       Some(CryptoStreamUtils.createKey(sc.conf)) 
  13.     } else { 
  14.       None 
  15.     } 
  16.     logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}"
  17.     var createSucess = false 
  18.     var count = 0 
  19.     val env = new AtomicReference[RpcEnv]() 
  20.     while (!createSucess && count < 10) { 
  21.       try { 
  22.         env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf, 
  23.           sc.env.securityManager, clientMode = !isDriver)) 
  24.         createSucess = true 
  25.       } catch { 
  26.         case e: Exception => 
  27.           logInfo("fail to create rpcenv", e) 
  28.           count += 1 
  29.           port += 1 
  30.       } 
  31.     } 
  32.     if (env.get() == null) { 
  33.       logError(s"fail to create rpcenv finally with attemp ${count} "
  34.     } 
  35.     env.get() 
  36.   } 
  37.  
  38.   def start() = { 
  39.     val env = createRpcEnv 
  40.     val pSDriverBackend = new PSDriverEndpoint(sc, env) 
  41.     psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend) 
  42.   } 
  43.  

這樣,你可以理解為在Driver端啟動(dòng)了一個(gè)PRC Server。要運(yùn)行這段代碼也非常簡(jiǎn)單,直接在主程序里運(yùn)行即可:

  1. // parameter server should be enabled by default 
  2.     if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) { 
  3.       logger.info("ps enabled..."
  4.       if (ss.sparkContext.isLocal) { 
  5.         localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext) 
  6.         localSchedulerBackend.start() 
  7.       } else { 
  8.         logger.info("start PSDriverBackend"
  9.         psDriverBackend = new PSDriverBackend(ss.sparkContext) 
  10.         psDriverBackend.start() 
  11.       } 
  12.     } 

這里我們需要實(shí)現(xiàn)local模式和cluster模式兩種。

Driver啟動(dòng)了一個(gè)PRC Server,那么Executor端如何啟動(dòng)呢?Executor端似乎沒有任何一個(gè)地方可以讓我啟動(dòng)一個(gè)PRC Server? 其實(shí)有的,只是非常trick,我們知道Spark是允許自定義Metrics的,并且會(huì)調(diào)用用戶實(shí)現(xiàn)的metric特定的方法,我們只要開發(fā)一個(gè)metric Sink,在里面啟動(dòng)RPC Server,騙過Spark即可。具體時(shí)下如下:

  1. class PSServiceSink(val property: Properties, val registry: MetricRegistry, 
  2.                     securityMgr: SecurityManager) extends Sink with Logging { 
  3.   def env = SparkEnv.get 
  4.  
  5.   var psDriverUrl: String = null 
  6.   var psExecutorId: String = null 
  7.   var hostname: String = null 
  8.   var cores: Int = 0 
  9.   var appId: String = null 
  10.   val psDriverPort = 7777 
  11.   var psDriverHost: String = null 
  12.   var workerUrl: Option[String] = None 
  13.   val userClassPath = new mutable.ListBuffer[URL]() 
  14.  
  15.   def parseArgs = { 
  16.     //val runtimeMxBean = ManagementFactory.getRuntimeMXBean(); 
  17.     //var argv = runtimeMxBean.getInputArguments.toList 
  18.     var argv = System.getProperty("sun.java.command").split("\\s+").toList 
  19.  
  20.    ..... 
  21.     psDriverHost = host 
  22.     psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort 
  23.   } 
  24.  
  25.   parseArgs 
  26.  
  27.   def createRpcEnv = { 
  28.     val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER 
  29.     val bindAddress = hostname 
  30.     val advertiseAddress = "" 
  31.     val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt 
  32.     val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) { 
  33.       Some(CryptoStreamUtils.createKey(env.conf)) 
  34.     } else { 
  35.       None 
  36.     } 
  37.     //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}"
  38.     RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf, 
  39.       env.securityManager, clientMode = !isDriver) 
  40.   } 
  41.  
  42.   override def start(): Unit = { 
  43.  
  44.     new Thread(new Runnable { 
  45.       override def run(): Unit = { 
  46.         logInfo(s"delay PSExecutorBackend 3s"
  47.         Thread.sleep(3000) 
  48.         logInfo(s"start PSExecutor;env:${env}"
  49.         if (env.executorId != SparkContext.DRIVER_IDENTIFIER) { 
  50.           val rpcEnv = createRpcEnv 
  51.           val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores) 
  52.           PSExecutorBackend.executorBackend = Some(pSExecutorBackend) 
  53.           rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend) 
  54.         } 
  55.       } 
  56.     }).start() 
  57.  
  58.   } 
  59. ... 

到這里,我們就能成功啟動(dòng)RPC Server,并且連接上Driver中的PRC Server?,F(xiàn)在,你就可以在不修改Spark 源碼的情況下,盡情的寫通訊相關(guān)的代碼了,讓你可以更好的控制Executor。

比如在PSExecutorBackend 實(shí)現(xiàn)如下代碼:

  1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 
  2.     case Message.TensorFlowModelClean(modelPath) => { 
  3.       logInfo("clean tensorflow model"
  4.       TFModelLoader.close(modelPath) 
  5.       context.reply(true
  6.     } 
  7.     case Message.CopyModelToLocal(modelPath, destPath) => { 
  8.       logInfo(s"copying model: ${modelPath} -> ${destPath}"
  9.       HDFSOperator.copyToLocalFile(destPath, modelPath, true
  10.       context.reply(true
  11.     } 
  12.   } 

接著你就可以在Spark里寫如下的代碼調(diào)用了:

  1. val psDriverBackend = runtime.asInstanceOf[SparkRuntime].psDriverBackend psDriverBackend.psDriverRpcEndpointRef.send(Message.TensorFlowModelClean("/tmp/ok")) 

是不是很酷。

修改閉包的序列化方式

Spark的任務(wù)調(diào)度開銷非常大。對(duì)于一個(gè)復(fù)雜的任務(wù),業(yè)務(wù)邏輯代碼執(zhí)行時(shí)間大約是3-7ms,但是整個(gè)spark運(yùn)行的開銷大概是1.3s左右。

經(jīng)過詳細(xì)dig發(fā)現(xiàn),sparkContext里RDD轉(zhuǎn)化時(shí),會(huì)對(duì)函數(shù)進(jìn)行clean操作,clean操作的過程中,默認(rèn)會(huì)檢查是不是能序列化(就是序列化一遍,沒拋出異常就算可以序列化)。而序列化成本相當(dāng)高(默認(rèn)使用的JavaSerializer并且對(duì)于函數(shù)和任務(wù)序列化,是不可更改的),單次序列化耗時(shí)就達(dá)到200ms左右,在local模式下對(duì)其進(jìn)行優(yōu)化,可以減少600ms左右的請(qǐng)求時(shí)間。

當(dāng)然,需要申明的是,這個(gè)是針對(duì)local模式進(jìn)行修改的。那具體怎么做的呢?

我們先看看Spark是怎么調(diào)用序列化函數(shù)的,首先在SparkContext里,clean函數(shù)是這樣的:

  1. private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { 
  2.     ClosureCleaner.clean(f, checkSerializable) 
  3.     f 
  4.   } 

調(diào)用的是ClosureCleaner.clean方法,該方法里是這么調(diào)用學(xué)序列化的:

  1. try { 
  2.       if (SparkEnv.get != null) { 
  3.         SparkEnv.get.closureSerializer.newInstance().serialize(func) 
  4.       } 
  5.     } catch { 
  6.       case ex: Exception => throw new SparkException("Task not serializable", ex) 
  7.     } 

SparkEnv是在SparkContext初始化的時(shí)候創(chuàng)建的,該對(duì)象里面包含了closureSerializer,該對(duì)象通過new JavaSerializer創(chuàng)建。既然序列化太慢,又因?yàn)槲覀兤鋵?shí)是在Local模式下,本身是可以不需要序列化的,所以我們這里想辦法把closureSerializer的實(shí)現(xiàn)替換掉。正如我們前面吐槽,因?yàn)樵赟park代碼里寫死了,沒有暴露任何自定義的可能性,所以我們又要魔改一下了。

首先,我們新建一個(gè)SparkEnv的子類:

  1. class WowSparkEnv( 
  2.                    ....) extends SparkEnv( 

接著實(shí)現(xiàn)一個(gè)自定義的Serializer:

  1. class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance { 
  2.  
  3.   private def isClosure(cls: Class[_]): Boolean = { 
  4.     cls.getName.contains("$anonfun$"
  5.   } 
  6.  
  7.   override def serialize[T: ClassTag](t: T): ByteBuffer = { 
  8.     if (isClosure(t.getClass)) { 
  9.       val uuid = UUID.randomUUID().toString 
  10.       LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef]) 
  11.       ByteBuffer.wrap(uuid.getBytes()) 
  12.     } else { 
  13.       javaD.serialize(t) 
  14.     } 
  15.  
  16.   } 
  17.  
  18.   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { 
  19.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  20.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  21.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  22.     } else { 
  23.       bytes.flip() 
  24.       javaD.deserialize(bytes) 
  25.     } 
  26.  
  27.   } 
  28.  
  29.   override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { 
  30.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  31.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  32.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  33.     } else { 
  34.       bytes.flip() 
  35.       javaD.deserialize(bytes, loader) 
  36.     } 
  37.   } 
  38.  
  39.   override def serializeStream(s: OutputStream): SerializationStream = { 
  40.     javaD.serializeStream(s) 
  41.   } 
  42.  
  43.   override def deserializeStream(s: InputStream): DeserializationStream = { 
  44.     javaD.deserializeStream(s) 
  45.   } 

接著我們需要再封裝一個(gè)LocalNonOpSerializer,

  1. class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable { 
  2.   val javaS = new JavaSerializer(conf) 
  3.  
  4.   override def newInstance(): SerializerInstance = { 
  5.     new LocalNonOpSerializerInstance(javaS.newInstance()) 
  6.   } 
  7.  
  8.   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { 
  9.     javaS.writeExternal(out
  10.   } 
  11.  
  12.   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { 
  13.     javaS.readExternal(in
  14.   } 

現(xiàn)在,萬事俱備,只欠東風(fēng)了,我們?cè)趺床拍馨堰@些代碼讓Spark運(yùn)行起來。具體做法非常魔幻,實(shí)現(xiàn)一個(gè)enhance類:

  1. def enhanceSparkEnvForAPIService(session: SparkSession) = { 
  2.       val env = SparkEnv.get 
  3.    //創(chuàng)建一個(gè)新的WowSparkEnv對(duì)象,然后將里面的Serializer替換成我們自己的LocalNonOpSerializer 
  4.     val wowEnv = new WowSparkEnv( 
  5.  ..... 
  6.       new LocalNonOpSerializer(env.conf): Serializer, 
  7.  ....) 
  8.     // 將SparkEnv object里的實(shí)例替換成我們的 
  9.     //WowSparkEnv 
  10.     SparkEnv.set(wowEnv) 
  11.   //但是很多地方在SparkContext啟動(dòng)后都已經(jīng)在使用之前就已經(jīng)生成的SparkEnv,我們需要做些調(diào)整 
  12. //我們先把之前已經(jīng)啟動(dòng)的LocalSchedulerBackend里的scheduer停掉 
  13.     val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend] 
  14.  
  15.     val scheduler = ReflectHelper.field(localScheduler, "scheduler"
  16.  
  17.     val totalCores = localScheduler.totalCores 
  18.     localScheduler.stop() 
  19.  
  20.   //創(chuàng)建一個(gè)新的LocalSchedulerBackend 
  21.     val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores) 
  22.     wowLocalSchedulerBackend.start() 
  23.  //把SparkContext里的_schedulerBackend替換成我們的實(shí)現(xiàn) 
  24.     ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend) 
  25.   } 

完工。

其實(shí)還有很多

比如在Spark里,Python Worker默認(rèn)一分鐘沒有被使用是會(huì)被殺死的,但是在StreamingPro里,這些python worker因?yàn)槎家虞d模型,所以啟動(dòng)成本是非常高的,殺了之后再啟動(dòng)就沒辦法忍受了,通過類似的方式進(jìn)行魔改,從而使得空閑時(shí)間是可配置的。如果大家感興趣,可以翻看StreamingPro相關(guān)代碼。

責(zé)任編輯:未麗燕 來源: 簡(jiǎn)書
相關(guān)推薦

2019-11-13 15:46:56

硬件CPU主板

2017-03-02 17:40:20

Linux移動(dòng)存儲(chǔ)設(shè)備

2022-09-23 13:57:11

xxl-job任務(wù)調(diào)度中間件

2018-10-31 15:36:02

CPU優(yōu)點(diǎn)缺點(diǎn)

2017-12-25 10:40:01

Python單例字典模塊

2021-06-06 19:03:25

SQL大數(shù)據(jù)Spark

2021-04-30 07:33:58

微軟Android系統(tǒng)Surface Duo

2017-06-21 08:39:20

SparkScalaHDFS

2016-11-07 16:06:43

大數(shù)據(jù)SparkImpala

2021-07-26 08:49:27

Windows 11操作系統(tǒng)微軟

2021-12-13 17:53:19

谷歌Transformer技術(shù)

2022-04-18 11:05:36

開源github代碼庫

2023-06-13 07:06:30

RTX顯存位公版卡

2022-01-26 20:01:24

管理工具knife4j

2024-04-15 07:50:00

AI架構(gòu)

2018-03-01 08:39:34

HadoopSpark加密貨幣

2022-01-17 09:19:12

Transformer數(shù)據(jù)人工智能

2022-01-10 06:03:51

Windows 11操作系統(tǒng)微軟

2024-06-03 10:56:53

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

国产a级片网站| 激情视频一区二区| 欧美三根一起进三p| 米奇精品关键词| 在线视频中文字幕一区二区| 在线综合视频网站| 熟妇高潮一区二区高潮| 男人的j进女人的j一区| 久久久久国产一区二区三区| 亚洲av无码成人精品国产| 精品69视频一区二区三区| 亚洲综合区在线| 日本一区美女| 亚洲欧美激情另类| 毛片一区二区三区| 97高清免费视频| 亚洲女人久久久| 天堂日韩电影| 日韩欧美亚洲一区二区| 日韩在线xxx| 污污网站在线看| 久久精品亚洲一区二区三区浴池| 5566中文字幕一区二区| 99久久久无码国产精品免费蜜柚| 中文字幕亚洲综合久久五月天色无吗''| 亚洲精品电影网站| 超碰91在线播放| 亚洲精品555| 精品国产乱码久久久久久虫虫漫画| 一区二区三区四区视频在线观看| 日本一卡二卡四卡精品| 高清成人免费视频| 91精品久久久久久久久中文字幕| 一级黄色免费网站| 自由日本语亚洲人高潮| 最新的欧美黄色| 欧洲女同同性吃奶| 欧美毛片免费观看| 精品国精品国产| 99中文字幕在线| 三上悠亚国产精品一区二区三区| 亚洲一区二区三区中文字幕在线| 亚洲一区二区精品在线| 高清国产福利在线观看| 337p粉嫩大胆色噜噜噜噜亚洲| 666精品在线| 国产模特av私拍大尺度| 久久精品久久99精品久久| 国产99在线|中文| 日韩久久中文字幕| 中文在线一区| 91成人福利在线| 日本在线视频免费| 伊人久久亚洲热| 久久久久久久久国产| 久久久久久久国产视频| 欧美日韩国产免费观看| 欧美精品情趣视频| 精品99久久久久成人网站免费 | 91精品婷婷国产综合久久性色 | av成人免费| 日韩欧美一区二区三区| 男女午夜激情视频| 国产成人免费9x9x人网站视频 | 欧美亚洲另类色图| 日韩av一卡| 欧美色播在线播放| 久久黄色免费看| 国产一区二区色噜噜| 欧美日韩亚洲丝袜制服| 久久久久久久久久一区二区| 精品国产乱码久久久久久樱花| 欧美一级片在线| 日本人妻一区二区三区| 免费看成人人体视频| 日韩精品在线观| 老头老太做爰xxx视频| 欧美日韩性在线观看| 久久久91精品| 国产在线观看免费视频今夜| 亚洲人成免费| 国产精品第2页| 国产精品亚洲欧美在线播放| 粉嫩久久99精品久久久久久夜| 国产精品一区视频| 欧美白人做受xxxx视频| 国产精品国产三级国产专播品爱网| 国产成年人在线观看| a级片免费在线观看| 色诱视频网站一区| www.五月天色| 欧美大片网址| 深夜福利国产精品| 久久精品免费在线| 日韩福利电影在线| 不卡一区二区三区视频| 亚洲av毛片成人精品| 国产精品午夜在线观看| 欧美又粗又长又爽做受| 岛国一区二区| 亚洲福利小视频| 成人三级视频在线观看| 99av国产精品欲麻豆| 国产精品影片在线观看| 亚洲xxxx天美| 国产精品天干天干在线综合| 青青青在线观看视频| 成人日韩精品| 精品国产乱码久久久久久久 | 亚洲色欲色欲www| 亚洲熟妇无码另类久久久| 国产精品66| 亚洲精品日韩欧美| 欧美黄色一区二区三区| 日韩在线卡一卡二| 粉嫩av四季av绯色av第一区| 午夜免费视频在线国产| 欧美日韩免费区域视频在线观看| 天天干天天色天天干| 国产aⅴ精品一区二区三区久久| 久久国产精品网站| 在线观看网站黄| 一级黄色片在线观看| 日本免费新一区视频| 精品一卡二卡三卡四卡日本乱码| 日本a在线播放| 日韩欧美在线播放| 又黄又爽的网站| 永久91嫩草亚洲精品人人| 国产精品久久久久久久久久久久久 | 国产在线观看黄| 亚洲一区二区三区不卡国产欧美 | 风间由美性色一区二区三区| 亚洲成人网上| 免费成人动漫| 亚洲精品99999| 国产第一页第二页| 国产成人自拍在线| 一区二区三区日韩视频| 国精品产品一区| 国产亚洲精品激情久久| 国产精品男女视频| 94色蜜桃网一区二区三区| 久久综合久久网| 日本高清精品| 九九久久国产精品| 国产成人精品毛片| 亚洲美女精品一区| 91丝袜超薄交口足| 国产精品3区| 久久嫩草精品久久久精品| 亚洲福利av在线| 欧美日韩精品一区二区三区视频| 亚洲午夜精品久久久久久性色| 成人毛片18女人毛片| 99久久综合色| 午夜肉伦伦影院| 啪啪亚洲精品| 国产精品日韩一区| 免费人成在线观看播放视频 | 91精品国产91久久久久| 三级在线观看网站| 懂色av中文一区二区三区天美| 一本色道综合久久欧美日韩精品 | 色呦呦免费观看| 香蕉加勒比综合久久| xxxwww国产| 亚洲尤物影院| 日韩欧美一区二区三区四区| 成人免费毛片嘿嘿连载视频…| 中国人与牲禽动交精品| 91影院在线播放| 一区二区理论电影在线观看| 亚洲图片欧美另类| 久久久久久久欧美精品| 一区二区三区av| 亚洲2区在线| 91精品国产91久久久久久不卡 | 国产亚洲短视频| 欧美日韩中文不卡| 欧美久久久久| 久久大片网站| 久久久精品一区二区毛片免费看| 久久久精品在线观看| 丰满人妻一区二区三区无码av| 精品福利在线观看| 超薄肉色丝袜一二三| 国产一区二区三区高清播放| 国产男女免费视频| 色135综合网| 国产精品一区二区三区不卡| 午夜精品成人av| 欧美成人精品在线| 日韩专区一区二区| 3d成人h动漫网站入口| 国产精品9191| 亚洲国产高清aⅴ视频| 国产乱国产乱老熟300部视频| 久久狠狠一本精品综合网| 性欧美18一19内谢| 九九精品在线| av一区和二区| 日韩美女在线| 欧美亚洲日本黄色| 中文字幕在线观看网站| 亚洲人成在线观看网站高清| 国产丝袜视频在线观看| 一本色道久久综合亚洲精品按摩| 杨钰莹一级淫片aaaaaa播放| 久久久亚洲欧洲日产国码αv| 夜夜爽久久精品91| av在线免费观看国产| 欧美少妇性xxxx| 久久精品国产理论片免费| 国产精品高清一区二区| 日本a级片电影一区二区| 色呦呦网站在线观看| 伊人伊成久久人综合网站| 欧美一级淫片aaaaaa| 777精品伊人久久久久大香线蕉| 成人免费a视频| 亚洲一区二三区| 亚洲精品卡一卡二| 欧美国产欧美综合| 中文字幕高清视频| av福利精品导航| 蜜桃色一区二区三区| 久久se精品一区二区| 免费观看成人网| 中国女人久久久| 亚洲色成人www永久在线观看| 91视频精品| 亚洲高清视频一区| 欧美丝袜激情| 日韩av电影免费播放| 小嫩嫩12欧美| 蜜桃999成人看片在线观看| av男人一区| 国产精品免费观看高清| 日本一区二区三区视频在线看| 国产啪精品视频| 精品美女一区| 国产主播喷水一区二区| 日本精品久久| 国产精品自产拍在线观看| 国产精品亚洲一区二区三区在线观看 | 永久免费观看片现看| 国产色综合久久| 手机看片福利视频| 日本一区二区免费在线| www亚洲色图| 国产精品高潮呻吟久久| 国产又黄又粗又猛又爽的| 中国av一区二区三区| 亚洲欧美综合7777色婷婷| 欧美激情一区二区三区全黄 | 青草国产精品| 亚洲国产精品综合| 国产高清欧美| 法国空姐在线观看免费| 欧美精品首页| 欧美色图色综合| 久久久久欧美精品| 爱情岛论坛vip永久入口| 久久99精品久久久久久久久久久久| av中文字幕网址| 粉嫩aⅴ一区二区三区四区| 最新版天堂资源在线| 91网址在线看| 99精品欧美一区二区| 国产精品二三区| 国产精品国产精品88| 亚洲一本大道在线| 五月婷婷中文字幕| 欧美色倩网站大全免费| av男人天堂网| 日韩精品极品视频| 亚洲成人三级| 欧美激情第99页| 最新中文字幕在线播放| 国产精品视频色| 欧美电影院免费观看| 九九九九九九精品| 成人在线免费小视频| 少妇一晚三次一区二区三区| 亚洲欧美bt| 亚洲一级片网站| www.欧美色图| 99国产精品无码| 午夜婷婷国产麻豆精品| 中文字幕在线观看高清| 日韩视频一区二区三区| 能在线看的av| 欧美成人亚洲成人日韩成人| 小视频免费在线观看| 91精品在线观| 网红女主播少妇精品视频| 欧美 日韩 国产 在线观看| 国产精品资源| 日韩不卡的av| 欧美激情资源网| 久草精品视频在线观看| 欧美日韩精品三区| 无码精品一区二区三区在线 | 欧美v亚洲v| 国产大片精品免费永久看nba| 亚洲三级av| 亚洲图片在线观看| 亚洲免费综合| 中国xxxx性xxxx产国| 自拍偷拍亚洲综合| 好吊色在线视频| 亚洲成成品网站| 国产秀色在线www免费观看| 国产91在线播放九色快色| 7m精品国产导航在线| 亚洲国产精品一区在线观看不卡| 中文一区在线| 男人的天堂影院| 亚洲欧美日韩综合aⅴ视频| 日韩xxx视频| 亚洲男人天堂2023| av影院在线免费观看| 91精品天堂| 91tv精品福利国产在线观看| 成人午夜激情av| 久久这里只有精品首页| 精品亚洲永久免费| 日韩欧美的一区| 国产激情小视频在线| 国产精品一区二区久久国产| 国产99精品| 18禁男女爽爽爽午夜网站免费| 粉嫩一区二区三区性色av| 免费在线观看亚洲| 日韩一区二区免费电影| 国产福利视频在线| 国产自摸综合网| 久久伦理在线| 日韩欧美国产片| 中文字幕精品一区二区精品绿巨人| 欧美成人一区二区三区四区| 国产视频久久久久久久| 午夜影院在线播放| 日本a级片久久久| 天堂蜜桃91精品| 亚洲精品色午夜无码专区日韩| 色综合欧美在线视频区| 黄色的视频在线免费观看| 热久久免费视频精品| 亚洲制服一区| 成人三级视频在线播放 | 免费在线观看日韩| 亚洲精品在线免费播放| h片在线观看下载| 久久99影院| 久久成人在线| 国产黄色大片免费看| 欧美日韩精品欧美日韩精品 | 天天色综合社区| 中文字幕中文字幕一区| aaaa一级片| 性金发美女69hd大尺寸| 亚洲日本三级| 久热精品在线播放| 亚洲女同ⅹxx女同tv| 亚洲精品一区二区口爆| 性欧美亚洲xxxx乳在线观看| 亚洲毛片免费看| 婷婷免费在线观看| 亚洲欧美电影院| 午夜黄色小视频| 国产精品电影网| 影视一区二区| 成人影视免费观看| 欧美三级乱人伦电影| 18av在线播放| 快播亚洲色图| 精品一区二区久久久| 九九视频在线观看| 亚洲免费影视第一页| 日韩护士脚交太爽了| 波多野结衣av一区二区全免费观看| 91视视频在线观看入口直接观看www | 国产区高清在线| 91精品入口蜜桃| 天堂va蜜桃一区二区三区 | 欧美日韩国产精品一区| 成人p站proumb入口| 亚洲自拍偷拍福利| 亚洲制服少妇| 亚洲天堂一级片| 亚洲精品资源美女情侣酒店| 久久亚洲人体| 尤物av无码色av无码| 国产精品美女视频| 日本人妻丰满熟妇久久久久久| 国产精品日韩在线观看| 亚洲狠狠婷婷| 搜索黄色一级片|