精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

分布式計(jì)算引擎 Flink/Spark on k8s 的實(shí)現(xiàn)對(duì)比以及實(shí)踐

網(wǎng)絡(luò) 分布式 Spark
以 Flink 和 Spark 為代表的分布式流批計(jì)算框架的下層資源管理平臺(tái)逐漸從 Hadoop 生態(tài)的 YARN 轉(zhuǎn)向 Kubernetes 生態(tài)的 k8s 原生 scheduler 以及周邊資源調(diào)度器,比如 Volcano 和 Yunikorn 等。這篇文章簡(jiǎn)單比較一下兩種計(jì)算框架在 Native Kubernetes 的支持和實(shí)現(xiàn)上的異同,以及對(duì)于應(yīng)用到生產(chǎn)環(huán)境我們還需要做些什么。

以 Flink 和 Spark 為代表的分布式流批計(jì)算框架的下層資源管理平臺(tái)逐漸從 Hadoop 生態(tài)的 YARN 轉(zhuǎn)向 Kubernetes 生態(tài)的 k8s 原生 scheduler 以及周邊資源調(diào)度器,比如 Volcano 和 Yunikorn 等。這篇文章簡(jiǎn)單比較一下兩種計(jì)算框架在 Native Kubernetes 的支持和實(shí)現(xiàn)上的異同,以及對(duì)于應(yīng)用到生產(chǎn)環(huán)境我們還需要做些什么。

1. 什么是 Native

這里的 native 其實(shí)就是計(jì)算框架直接向 Kubernetes 申請(qǐng)資源。比如很多跑在 YARN 上面的計(jì)算框架,需要自己實(shí)現(xiàn)一個(gè) AppMaster 來(lái)想 YARN 的 ResourceManager 來(lái)申請(qǐng)資源。Native K8s 相當(dāng)于計(jì)算框架自己實(shí)現(xiàn)一個(gè)類(lèi)似 AppMaster 的角色向 k8s 去申請(qǐng)資源,當(dāng)然和 AppMaster 還是有差異的 (AppMaster 需要按 YARN 的標(biāo)準(zhǔn)進(jìn)行實(shí)現(xiàn))。

2. Spark on k8s 使用

提交作業(yè)

向 k8s 集群提交作業(yè)和往 YARN 上面提交很類(lèi)似,命令如下,主要區(qū)別包括:

--master 參數(shù)指定 k8s 集群的 ApiServer
需要通過(guò)參數(shù) spark.kubernetes.container.image 指定在 k8s 運(yùn)行作業(yè)的 image,
指定 main jar,需要 driver 進(jìn)程可訪(fǎng)問(wèn):如果 driver 運(yùn)行在 pod 中,jar 包需要包含在鏡像中;如果 driver 運(yùn)行在本地,那么 jar 需要在本地。
通過(guò) --name 或者 spark.app.name 指定 app 的名字,作業(yè)運(yùn)行起來(lái)之后的 driver 命名會(huì)以 app 名字為前綴。當(dāng)然也可以通過(guò)參數(shù) spark.kubernetes.driver.pod.name 直接指定 dirver 的名字

  1. $ ./bin/spark-submit \    --master k8s://https://: \    --deploy-mode cluster \    --name spark-pi \    --class org.apache.spark.examples.SparkPi \    --conf spark.executor.instances=5 \    --conf spark.kubernetes.container.image= \    local:///path/to/examples.jar 

提交完該命令之后,spark-submit 會(huì)創(chuàng)建一個(gè) driver pod 和一個(gè)對(duì)應(yīng)的 servcie,然后由 driver 創(chuàng)建 executor pod 并運(yùn)行作業(yè)。

deploy-mode

和在 YARN 上面使用 Spark 一樣,在 k8s 上面也支持 cluster 和 client 兩種模式:

cluster mode: driver 在 k8s 集群上面以 pod 形式運(yùn)行。
client mode: driver 運(yùn)行在提交作業(yè)的地方,然后 driver 在 k8s 集群上面創(chuàng)建 executor。為了保證 executor 能夠注冊(cè)到 driver 上面,還需要提交作業(yè)的機(jī)器可以和 k8s 集群內(nèi)部的 executor 網(wǎng)絡(luò)連通(executor 可以訪(fǎng)問(wèn)到 driver,需要注冊(cè))。
資源清理

這里的資源指的主要是作業(yè)的 driver 和 executor pod。spark 通過(guò) k8s 的 onwer reference 機(jī)制將作業(yè)的各種資源連接起來(lái),這樣當(dāng) driver pod 被刪除的時(shí)候,關(guān)聯(lián)的 executor pod 也會(huì)被連帶刪除。但是如果沒(méi)有 driver pod,也就是以 client 模式運(yùn)行作業(yè)的話(huà),如下兩種情況涉及到資源清理:

作業(yè)運(yùn)行完成,driver 進(jìn)程退出,executor pod 運(yùn)行完自動(dòng)退出
driver 進(jìn)程被殺掉,executor pod 連不上 driver 也會(huì)自行退出
可以參考:https://kubernetes.io/docs/concepts/architecture/garbage-collection/

依賴(lài)管理

前面說(shuō)到 main jar 包需要在 driver 進(jìn)程可以訪(fǎng)問(wèn)到的地方,如果是 cluster 模式就需要將 main jar 打包到 spark 鏡像中。但是在日常開(kāi)發(fā)和調(diào)試中,每次重新 build 一個(gè)鏡像的 effort 實(shí)在是太大了。spark 支持提交的時(shí)候使用本地的文件,然后使用 s3 等作為中轉(zhuǎn):先上傳上去,然后作業(yè)運(yùn)行的時(shí)候再?gòu)?s3 上面下載下來(lái)。下面是一個(gè)實(shí)例。

  1. ...--packages org.apache.hadoop:hadoop-aws:3.2.0--conf spark.kubernetes.file.upload.path=s3a:///path--conf spark.hadoop.fs.s3a.access.key=...--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem--conf spark.hadoop.fs.s3a.fast.upload=true--conf spark.hadoop.fs.s3a.secret.key=....--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmpfile:///full/path/to/app.jar 

Pod Template

k8s 的 controller (比如 Deployment,Job)創(chuàng)建 Pod 的時(shí)候根據(jù) spec 中的 pod template 來(lái)創(chuàng)建。下面是一個(gè) Job 的示例。

  1. apiVersion: batch/v1kind: Jobmetadata:  name: hellospec:  template:    # 下面的是一個(gè) pod template    spec:      containers:      - name: hello        image: busybox        command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']      restartPolicy: OnFailure    # The pod template ends here 

由于我們通過(guò) spark-submit 提交 spark 作業(yè)的時(shí)候,最終的 k8s 資源(driver/executor pod)是由 spark 內(nèi)部邏輯構(gòu)建出來(lái)的。但是有的時(shí)候我們想要在 driver/executor pod 上做一些額外的工作,比如增加 sidecar 容器做一些日志收集的工作。這種場(chǎng)景下 PodTemplate 就是一個(gè)比較好的選擇,同時(shí) PodTemplate 也將 spark 和底層基礎(chǔ)設(shè)施(k8s)解耦開(kāi)。比如 k8s 發(fā)布新版本支持一些新的特性,那么我們只要修改我們的 PodTemplate 即可,而不涉及到 spark 的內(nèi)部改動(dòng)。

RBAC

RBAC 全稱(chēng)是 Role-based access control,是 k8s 中的一套權(quán)限控制機(jī)制。通俗來(lái)說(shuō):

RBAC 中包含了一系列的權(quán)限設(shè)置,比如 create/delete/watch/list pod 等,這些權(quán)限集合的實(shí)體叫 Role 或者 ClusterRole
同時(shí) RBAC 還包含了角色綁定關(guān)系(Role Binding),用于將 Role/ClusterRole 賦予一個(gè)或者一組用戶(hù),比如 Service Account 或者 UserAccount
為了將 Spark 作業(yè)在 k8s 集群中運(yùn)行起來(lái),我們還需要一套 RBAC 資源:

指定 namespace 下的 serviceaccount
定義了權(quán)限規(guī)則的 Role 或者 ClusterRole,我們可以使用常見(jiàn)的 ClusterRole "edit"(對(duì)幾乎所有資源具有操作權(quán)限,比如 create/delete/watch 等)
綁定關(guān)系
下面命令在 spark namespace 下為 serviceaccount spark 賦予了操作同 namespace 下其他資源的權(quán)限,那么只要 spark 的 driver pod 掛載了該 serviceaccount,它就可以創(chuàng)建 executor pod 了。

  1. $ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark 

下面做一個(gè)簡(jiǎn)單的演示:

通過(guò)如下命令提交作業(yè) SparkPiSleep 到 k8s 集群中。

  1. $ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory=2g --conf spark.driver.memory=2g --conf spark.driver.core=1 --conf spark.app.name=test12 --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.executor.core=1 --conf spark.kubernetes.container.image= --conf spark.eventLog.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=1 --conf spark.dynamicAllocation.enabled=false --conf sparkspark.kubernetes.namespace=spark --conf sparkspark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.core=1  local:///path/to/main/jar 

查看 k8s 集群中的資源

  1. $ kubectl get po -n sparkNAME                               READY   STATUS              RESTARTS   AGEspark-pi-5b88a27b576050dd-exec-1   0/1     ContainerCreating   0          2stest12-9fd3c27b576039ae-driver     1/1     Running             0          8s 

其中第一個(gè)就是 executor pod,第二個(gè)是 driver 的 pod。除此之外還創(chuàng)建了一個(gè) service,可以通過(guò)該 service 訪(fǎng)問(wèn)到 driver pod,比如 Spark UI 都可以這樣訪(fǎng)問(wèn)到。

  1. $ kubectl get svc -n sparkNAME                                 TYPE           CLUSTER-IP     EXTERNAL-IP     PORT(S)                                       AGEtest12-9fd3c27b576039ae-driver-svc   ClusterIP      None                     7078/TCP,7079/TCP,4040/TCP                    110s 

下面再看一下 service owner reference,executor pod 也是類(lèi)似的。

  1. $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata:  creationTimestamp: "2021-08-18T03:48:50Z"  name: test12-9fd3c27b576039ae-driver-svc  namespace: spark  # service 的 ownerReference 指向了 driver pod,只要 driver pod 被刪除,該 service 也會(huì)被刪除  ownerReferences:  - apiVersion: v1    controller: true    kind: Pod    name: test12-9fd3c27b576039ae-driver    uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95  resourceVersion: "9975441"  uid: 06c1349f-be52-4133-80d9-07af34419b1f 

3. Flink on k8s 使用

Flink on k8s native 的實(shí)現(xiàn)支持兩種模式:

application mode:在遠(yuǎn)程 k8s 集群中啟動(dòng)一個(gè) flink 集群(jm 和 tm),driver 運(yùn)行在 jm 中,也就是只支持 detached 模式,不支持 attached 模式。
session mode:在遠(yuǎn)程 k8s 集群?jiǎn)?dòng)一個(gè)常駐的 flink 集群(只有 jm),然后向上面提交作業(yè),根據(jù)實(shí)際情況決定啟動(dòng)多少個(gè) tm。
在生產(chǎn)上面使用一般不太建議使用 session mode,所以下面主要討論的是 application mode。

Flink 的 native k8s 模式是不需要指定 tm 個(gè)數(shù)的,jm 會(huì)根據(jù)用戶(hù)的代碼計(jì)算需要多少 tm。

提交作業(yè)

下面是一個(gè)簡(jiǎn)單的提交命令,需要包含:

參數(shù) run-application 指定是 application 模式
參數(shù) --target 指定運(yùn)行在 k8s 上
參數(shù) kubernetes.container.image 指定作業(yè)運(yùn)行使用的 flink 鏡像
最后需要指定 main jar,路徑是鏡像中的路徑

  1. $ ./bin/flink run-application \    --target kubernetes-application \    -Dkubernetes.cluster-id=my-first-application-cluster \    -Dkubernetes.container.image=custom-image-name \    local:///opt/flink/usrlib/my-flink-job.jar 

資源清理

Flink 的 native 模式會(huì)先創(chuàng)建一個(gè) JobManager 的 deployment,并將其托管給 k8s。同一個(gè)作業(yè)所有的相關(guān)資源的 owner reference 都指向該 Deployment,也就是說(shuō)刪除了該 deployment,所有相關(guān)的資源都會(huì)被清理掉。下面根據(jù)作業(yè)的運(yùn)行情況討論一下資源如何清理。

作業(yè)運(yùn)行到終態(tài)(SUCCESS,F(xiàn)AILED,CANCELED 等)之后,F(xiàn)link 會(huì)清理掉所有作業(yè)
JobManager 進(jìn)程啟動(dòng)失敗(pod 中的 jm 容器啟動(dòng)失敗),由于控制器是 Deployment,所以會(huì)一直重復(fù)拉起
運(yùn)行過(guò)程中,如果 JobManager 的 pod 被刪除,Deployment 會(huì)重新拉起
運(yùn)行過(guò)程中,如果 JobManager 的 Deployment 被刪除,那么關(guān)聯(lián)的所有 k8s 資源都會(huì)被刪除

Pod Template

Flink native 模式也支持 Pod Template,類(lèi)似 Spark。

RBAC

類(lèi)似 Spark。

依賴(lài)文件管理

Flink 暫時(shí)只支持 main jar 以及依賴(lài)文件在鏡像中。也就是說(shuō)用戶(hù)要提交作業(yè)需要自己定制化鏡像,體驗(yàn)不是很好。一種 workaroud 的方式是結(jié)合 PodTemplate:

如果依賴(lài)是本地文件,需要 upload 到一個(gè) remote 存儲(chǔ)做中轉(zhuǎn),比如各大云廠商的對(duì)象存儲(chǔ)。
如果依賴(lài)是遠(yuǎn)端文件,不需要 upload。
運(yùn)行時(shí)在 template 中使用 initContainer 將用戶(hù)的 jar 以及依賴(lài)文件下載到 Flink 容器中,并加到 classpath 下運(yùn)行。
Flink 的作業(yè) demo 就不在演示了。

4. Spark on Kubernetes 實(shí)現(xiàn)

Spark on Kubernetes 的實(shí)現(xiàn)比較簡(jiǎn)單:

Spark Client 創(chuàng)建一個(gè) k8s pod 運(yùn)行 driver
driver 創(chuàng)建 executor pod,然后開(kāi)始運(yùn)行作業(yè)
作業(yè)運(yùn)行結(jié)束之后 driver pod 進(jìn)入到 Completed 狀態(tài),executor pod 會(huì)被清理掉。作業(yè)結(jié)束之后通過(guò) driver pod 我們還是可以查看 driver pod 的。

代碼實(shí)現(xiàn)

Spark 的 native k8s 實(shí)現(xiàn)代碼在 resource-managers/kubernetes module 中。我們可以從 SparkSubmit 的代碼開(kāi)始分析。我們主要看一下 deploy-mode 為 cluster 模式的代碼邏輯。

  1. // Set the cluster manager    val clusterManager: Int = args.master match {      case "yarn" => YARN      case m if m.startsWith("spark") => STANDALONE      case m if m.startsWith("mesos") => MESOS      case m if m.startsWith("k8s") => KUBERNETES      case m if m.startsWith("local") => LOCAL      case _ =>        error("Master must either be yarn or start with spark, mesos, k8s, or local")        -1    } 

首先根據(jù) spark.master 配置中 scheme 來(lái)判斷是不是 on k8s。我們上面也看到這個(gè)配置的形式為 --master k8s://https://: 。如果是 on k8s 的 cluster 模式,則去加載 Class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,并運(yùn)行其中的 start 方法。childArgs 方法的核心邏輯簡(jiǎn)單來(lái)說(shuō)就是根據(jù) spark-submit 提交的參數(shù)構(gòu)造出 driver pod 提交到 k8s 運(yùn)行。

  1. private[spark] class KubernetesClientApplication extends SparkApplication {  override def start(args: Array[String], conf: SparkConf): Unit = {    val parsedArguments = ClientArguments.fromCommandLineArgs(args)    run(parsedArguments, conf)  }  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {    // For constructing the app ID, we can't use the Spark application name, as the app ID is going    // to be added as a label to group resources belonging to the same application. Label values are    // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate    // a unique app ID (captured by spark.app.id) in the format below.    val kubernetesAppId = KubernetesConf.getKubernetesAppId()    val kubernetesConf = KubernetesConf.createDriverConf(      sparkConf,      kubernetesAppId,      clientArguments.mainAppResource,      clientArguments.mainClass,      clientArguments.driverArgs,      clientArguments.proxyUser)    // The master URL has been checked for validity already in SparkSubmit.    // We just need to get rid of the "k8s://" prefix here.    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))    val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)    Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(      master,      Some(kubernetesConf.namespace),      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,      SparkKubernetesClientFactory.ClientType.Submission,      sparkConf,      None,      None)) { kubernetesClient =>        val client = new Client(          kubernetesConf,          new KubernetesDriverBuilder(),          kubernetesClient,          watcher)        client.run()    }  }} 

上面的代碼的核心就是最后創(chuàng)建 Client 并運(yùn)行。這個(gè) Client 是 Spark 封裝出來(lái)的 Client,內(nèi)置了 k8s client。

  1. private[spark] class Client(    conf: KubernetesDriverConf,    builder: KubernetesDriverBuilder,    kubernetesClient: KubernetesClient,    watcher: LoggingPodStatusWatcher) extends Logging {  def run(): Unit = {    // 構(gòu)造 Driver 的 Pod    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)    val configMapName = KubernetesClientUtils.configMapNameDriver    val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,      conf.sparkConf, resolvedDriverSpec.systemProperties)    val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap)    // 修改 Pod 的 container spec:增加 SPARK_CONF_DIR    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)      .addNewEnv()        .withName(ENV_SPARK_CONF_DIR)        .withValue(SPARK_CONF_DIR_INTERNAL)        .endEnv()      .addNewVolumeMount()        .withName(SPARK_CONF_VOLUME_DRIVER)        .withMountPath(SPARK_CONF_DIR_INTERNAL)        .endVolumeMount()      .build()    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)      .editSpec()        .addToContainers(resolvedDriverContainer)        .addNewVolume()          .withName(SPARK_CONF_VOLUME_DRIVER)          .withNewConfigMap()            .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava)            .withName(configMapName)            .endConfigMap()          .endVolume()        .endSpec()      .build()    val driverPodName = resolvedDriverPod.getMetadata.getName    var watch: Watch = null    var createdDriverPod: Pod = null    try {      // 通過(guò) k8s client 創(chuàng)建 Driver Pod      createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)    } catch {      case NonFatal(e) =>        logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")        throw e    }    try {      // 創(chuàng)建其他資源,修改 owner reference 等      val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)      addOwnerReference(createdDriverPod, otherKubernetesResources)      kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()    } catch {      case NonFatal(e) =>        kubernetesClient.pods().delete(createdDriverPod)        throw e    }    val sId = Seq(conf.namespace, driverPodName).mkString(":")    // watch pod    breakable {      while (true) {        val podWithName = kubernetesClient          .pods()          .withName(driverPodName)        // Reset resource to old before we start the watch, this is important for race conditions        watcher.reset()        watch = podWithName.watch(watcher)        // Send the latest pod state we know to the watcher to make sure we didn't miss anything        watcher.eventReceived(Action.MODIFIED, podWithName.get())        // Break the while loop if the pod is completed or we don't want to wait        // 根據(jù)參數(shù) "spark.kubernetes.submission.waitAppCompletion" 判斷是否需要退出        if(watcher.watchOrStop(sId)) {          watch.close()          break        }      }    }  } 

下面再簡(jiǎn)單介紹一下 Driver 如何管理 Executor 的流程。當(dāng) Spark Driver 運(yùn)行 main 函數(shù)時(shí),會(huì)創(chuàng)建一個(gè) SparkSession,SparkSession 中包含了 SparkContext,SparkContext 需要?jiǎng)?chuàng)建一個(gè) SchedulerBackend 會(huì)管理 Executor 的生命周期。對(duì)應(yīng)到 k8s 上的 SchedulerBackend 其實(shí)就是 KubernetesClusterSchedulerBackend,下面主要看一下這個(gè) backend 是如何創(chuàng)建出來(lái)的。大膽猜想一下,大概率也是根據(jù) spark.master 的 url 的 scheme "k8s" 創(chuàng)建的。

下面是 SparkContext 創(chuàng)建 SchedulerBackend 的核心代碼邏輯。

  1. private def createTaskScheduler(...) = {  case masterUrl =>    // 創(chuàng)建出 KubernetesClusterManager    val cm = getClusterManager(masterUrl) match {      case Some(clusterMgr) => clusterMgr      case None => throw new SparkException("Could not parse Master URL: '" + master + "'")    }    try {      val scheduler = cm.createTaskScheduler(sc, masterUrl)      // 上面創(chuàng)建出來(lái)的 KubernetesClusterManager 這里會(huì)創(chuàng)建出 KubernetesClusterSchedulerBackend      val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)      cm.initialize(scheduler, backend)      (backend, scheduler)    } catch {      case se: SparkException => throw se      case NonFatal(e) =>        throw new SparkException("External scheduler cannot be instantiated", e)    }}// 方法 getClsuterManager 會(huì)通過(guò) ServiceLoader 加載所有實(shí)現(xiàn) ExternalClusterManager 的 ClusterManager (KubernetesClusterManager 和 YarnClusterManager),然后通過(guò) master url 進(jìn)行 filter,選出 KubernetesClusterManagerprivate def getClusterManager(url: String): Option[ExternalClusterManager] = {  val loader = Utils.getContextOrSparkClassLoader  val serviceLoaders =    ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))  if (serviceLoaders.size > 1) {    throw new SparkException(      s"Multiple external cluster managers registered for the url $url: $serviceLoaders")  }  serviceLoaders.headOption} 

后面就是 KubernetesClusterSchedulerBackend 管理 Executor 的邏輯了。

可以簡(jiǎn)單看一下創(chuàng)建 Executor 的代碼邏輯。

  1. private def requestNewExecutors(      expected: Int,      running: Int,      applicationId: String,      resourceProfileId: Int,      pvcsInUse: Seq[String]): Unit = {    val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)    logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +      s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")    // Check reusable PVCs for this executor allocation batch    val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)    for ( _ <- 0 until numExecutorsToAllocate) {      val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()      val executorConf = KubernetesConf.createExecutorConf(        conf,        newExecutorId.toString,        applicationId,        driverPod,        resourceProfileId)      // 構(gòu)造 Executor 的 Pod Spec      val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,        kubernetesClient, rpIdToResourceProfile(resourceProfileId))      val executorPod = resolvedExecutorSpec.pod      val podWithAttachedContainer = new PodBuilder(executorPod.pod)        .editOrNewSpec()        .addToContainers(executorPod.container)        .endSpec()        .build()      val resources = replacePVCsIfNeeded(        podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)      // 創(chuàng)建 Executor Pod      val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)      try {        // 增加 owner reference        addOwnerReference(createdExecutorPod, resources)        resources          .filter(_.getKind == "PersistentVolumeClaim")          .foreach { resource =>            if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {              addOwnerReference(driverPod.get, Seq(resource))            }            val pvc = resource.asInstanceOf[PersistentVolumeClaim]            logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +              s"StorageClass ${pvc.getSpec.getStorageClassName}")            kubernetesClient.persistentVolumeClaims().create(pvc)          }        newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())        logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")      } catch {        case NonFatal(e) =>          kubernetesClient.pods().delete(createdExecutorPod)          throw e      }    }  } 

5. Flink on Kubernetes 實(shí)現(xiàn)

Flink 的 Native K8s 實(shí)現(xiàn):

Flink Client 創(chuàng)建 JobManager 的 Deployment,然后將 Deployment 托管給 k8s
k8s 的 Deployment Controller 創(chuàng)建 JobManager 的 Pod
JobManager 內(nèi)的 ResourceManager 負(fù)責(zé)先 Kubernetes Scheduler 請(qǐng)求資源并創(chuàng)建 TaskManager 等相關(guān)資源并創(chuàng)建相關(guān)的 TaskManager Pod 并開(kāi)始運(yùn)行作業(yè)
當(dāng)作業(yè)運(yùn)行到終態(tài)之后所有相關(guān)的 k8s 資源都被清理掉
代碼(基于分支 release-1.13)實(shí)現(xiàn)主要如下:

CliFrontend 作為 Flink Client 的入口根據(jù)命令行參數(shù) run-application 判斷通過(guò)方法 runApplication 去創(chuàng)建 ApplicationCluster
KubernetesClusterDescriptor 通過(guò)方法 deployApplicationCluster 創(chuàng)建 JobManager 相關(guān)的 Deployment 和一些必要的資源
JobManager 的實(shí)現(xiàn)類(lèi) JobMaster 通過(guò) ResourceManager 調(diào)用類(lèi) KubernetesResourceManagerDriver 中的方法 requestResource 創(chuàng)建 TaskManager 等資源
其中 KubernetesClusterDescriptor 實(shí)現(xiàn)自 interface ClusterDescriptor ,用來(lái)描述對(duì) Flink 集群的操作。根據(jù)底層的資源使用不同, ClusterDescriptor 有不同的實(shí)現(xiàn),包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。

  1. public interface ClusterDescriptor<T> extends AutoCloseable {    /* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */    String getClusterDescription();    /* 查詢(xún)已存在的 Flink 集群. */    ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;    /** 創(chuàng)建 Flink Session 集群 */    ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)            throws ClusterDeploymentException;    /** 創(chuàng)建 Flink Application 集群 **/    ClusterClientProvider<T> deployApplicationCluster(            final ClusterSpecification clusterSpecification,            final ApplicationConfiguration applicationConfiguration)            throws ClusterDeploymentException;    /** 創(chuàng)建 Per-job 集群 **/    ClusterClientProvider<T> deployJobCluster(            final ClusterSpecification clusterSpecification,            final JobGraph jobGraph,            final boolean detached)            throws ClusterDeploymentException;    /** 刪除集群 **/    void killCluster(T clusterId) throws FlinkException;    @Override    void close();} 

下面簡(jiǎn)單看一下 KubernetesClusterDescriptor 的核心邏輯:創(chuàng)建 Application 集群。

  1. public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {    private final Configuration flinkConfig;      // 內(nèi)置 k8s client    private final FlinkKubeClient client;    private final String clusterId;      @Override    public ClusterClientProvider<String> deployApplicationCluster(            final ClusterSpecification clusterSpecification,            final ApplicationConfiguration applicationConfiguration)            throws ClusterDeploymentException {        // 查詢(xún) flink 集群在 k8s 中是否存在        if (client.getRestService(clusterId).isPresent()) {            throw new ClusterDeploymentException(                    "The Flink cluster " + clusterId + " already exists.");        }        final KubernetesDeploymentTarget deploymentTarget =                KubernetesDeploymentTarget.fromConfig(flinkConfig);        if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) {            throw new ClusterDeploymentException(                    "Couldn't deploy Kubernetes Application Cluster."                            + " Expected deployment.target="                            + KubernetesDeploymentTarget.APPLICATION.getName()                            + " but actual one was \""                            + deploymentTarget                            + "\"");        }                // 設(shè)置 application 參數(shù):$internal.application.program-args 和 $internal.application.main        applicationConfiguration.applyToConfiguration(flinkConfig);              // 創(chuàng)建集群        final ClusterClientProvider<String> clusterClientProvider =                deployClusterInternal(                        KubernetesApplicationClusterEntrypoint.class.getName(),                        clusterSpecification,                        false);        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {            LOG.info(                    "Create flink application cluster {} successfully, JobManager Web Interface: {}",                    clusterId,                    clusterClient.getWebInterfaceURL());        }        return clusterClientProvider;    }      // 創(chuàng)建集群邏輯    private ClusterClientProvider<String> deployClusterInternal(            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)            throws ClusterDeploymentException {        final ClusterEntrypoint.ExecutionMode executionMode =                detached                        ? ClusterEntrypoint.ExecutionMode.DETACHED                        : ClusterEntrypoint.ExecutionMode.NORMAL;        flinkConfig.setString(                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.        // 將端口指定為固定值,方便 k8s 的資源構(gòu)建。因?yàn)?nbsp;pod 的隔離性,所以沒(méi)有端口沖突        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);        // HA 配置        if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {            flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);            KubernetesUtils.checkAndUpdatePortConfigOption(                    flinkConfig,                    HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,                    flinkConfig.get(JobManagerOptions.PORT));        }        try {            final KubernetesJobManagerParameters kubernetesJobManagerParameters =                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);            // 補(bǔ)充 PodTemplate 邏輯            final FlinkPod podTemplate =                    kubernetesJobManagerParameters                            .getPodTemplateFilePath()                            .map(                                    file ->                                            KubernetesUtils.loadPodFromTemplateFile(                                                    client, file, Constants.MAIN_CONTAINER_NAME))                            .orElse(new FlinkPod.Builder().build());            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(                            podTemplate, kubernetesJobManagerParameters);                      // 核心邏輯:在 k8s 中創(chuàng)建包括 JobManager Deployment 在內(nèi) k8s 資源,比如 Service 和 ConfigMap            client.createJobManagerComponent(kubernetesJobManagerSpec);            return createClusterClientProvider(clusterId);        } catch (Exception e) {            //...        }    }} 

上面代碼中需要說(shuō)的在構(gòu)建 JobManager 的時(shí)候補(bǔ)充 PodTemplate。簡(jiǎn)單來(lái)說(shuō) PodTemplate 就是一個(gè) Pod 文件。

第三步的 TaskManager 創(chuàng)建就不再贅述了。

7. 生態(tài)

這里生態(tài)這個(gè)詞可能也不太合適,這里主要指的的如果要在生產(chǎn)上面使用該功能還有哪些可以做的。下面主要討論在生產(chǎn)環(huán)境上面用來(lái)做 trouble-shooting 的兩個(gè)功能:日志和監(jiān)控。

日志

日志收集對(duì)于線(xiàn)上系統(tǒng)是非常重要的一環(huán),毫不夸張地說(shuō),80% 的故障都可以通過(guò)日志查到原因。但是前面也說(shuō)過(guò),F(xiàn)link 作業(yè)在作業(yè)運(yùn)行到終態(tài)之后會(huì)清理掉所有資源,Spark 作業(yè)運(yùn)行完只會(huì)保留 Driver Pod 的日志,那么我們?nèi)绾问占酵暾淖鳂I(yè)日志呢?

有幾種方案可供選擇:

DaemonSet。每個(gè) k8s 的 node 上面以 DaemonSet 形式部署日志收集 agent,對(duì) node 上面運(yùn)行的所有容器日志進(jìn)行統(tǒng)一收集,并存儲(chǔ)到類(lèi)似 ElasticSearch 的統(tǒng)一日志搜索平臺(tái)。
SideCar。使用 Flink/Spark 提供的 PodTemplate 功能在主容器側(cè)配置一個(gè) SideCar 容器用來(lái)進(jìn)行日志收集,最后存儲(chǔ)到統(tǒng)一的日志服務(wù)里面。
這兩種方式都有一個(gè)前提是有其他的日志服務(wù)提供存儲(chǔ)、甚至搜索的功能,比如 ELK,或者各大云廠商的日志服務(wù)。

除此之外還有一種簡(jiǎn)易的方式可以考慮:利用 log4j 的擴(kuò)展機(jī)制,自定義 log appender,在 appender 中定制化 append 邏輯,將日志直接收集并存儲(chǔ)到 remote storage,比如 hdfs,對(duì)象存儲(chǔ)等。這種方案需要將自定義的 log appender 的 jar 包放到運(yùn)行作業(yè)的 ClassPath 下,而且這種方式有可能會(huì)影響作業(yè)主流程的運(yùn)行效率,對(duì)性能比較敏感的作業(yè)并不太建議使用這種方式。

監(jiān)控

目前 Prometheus 已經(jīng)成為 k8s 生態(tài)的監(jiān)控事實(shí)標(biāo)準(zhǔn),下面我們的討論也是討論如何將 Flink/Spark 的作業(yè)的指標(biāo)對(duì)接到 Prometheus。下面先看一下 Prometheus 的架構(gòu)。

其中的核心在于 Prometheus Servier 收集指標(biāo)的方式是 pull 還是 push:

對(duì)于常駐的進(jìn)程,比如在線(xiàn)服務(wù),一般由 Prometheus Server 主動(dòng)去進(jìn)程暴露出來(lái)的 api pull 指標(biāo)。
對(duì)于會(huì)結(jié)束的進(jìn)程指標(biāo)收集,比如 batch 作業(yè),一般使用進(jìn)程主動(dòng) push 的方式。詳細(xì)流程是進(jìn)程將指標(biāo) push 到常駐的 PushGateway,然后 Prometheus Server 去 PushGateway pull 指標(biāo)。
上面兩種使用方式也是 Prometheus 官方建議的使用方式,但是看完描述不難發(fā)現(xiàn)其實(shí)第一種場(chǎng)景也可以使用第二種處理方式。只不過(guò)第二種方式由于 PushGateway 是常駐的,對(duì)其穩(wěn)定性要求會(huì)比較高。

Flink

Flink 同時(shí)提供了 PrometheusReporter (將指標(biāo)通過(guò) api 暴露,由 Prometheus Server 來(lái)主動(dòng) pull 數(shù)據(jù)) 和 PrometheusPushGatewayReporter (將指標(biāo)主動(dòng) push 給 PushGateway,Prometheus Server 不需要感知 Flink 作業(yè))。

這兩種方式中 PrometheusPushGatewayReporter 會(huì)更簡(jiǎn)單一點(diǎn),但是 PushGateway 可能會(huì)成為瓶頸。如果使用 PrometheusReporter 的方式,需要引入服務(wù)發(fā)現(xiàn)機(jī)制幫助 Prometheus Server 自動(dòng)發(fā)現(xiàn)運(yùn)行的 Flink 作業(yè)的 Endpoint。Prometheus 目前支持的主流的服務(wù)發(fā)現(xiàn)機(jī)制主要有:

基于 Consul。Consul 是基于 etcd 的一套完整的服務(wù)注冊(cè)與發(fā)現(xiàn)解決方案,要使用這種方式,我們需要 Flink 對(duì)接 Consul。比如我們?cè)谔峤蛔鳂I(yè)的時(shí)候,將作業(yè)對(duì)應(yīng)的 Service 進(jìn)行捕獲并寫(xiě)入 Consul。
基于文件。文件也就是 Prometheus 的配置文件,里面配置需要拉取 target 的 endpoint。文件這種方式本來(lái)是比較雞肋的,因?yàn)樗枰?Prometheus Server 和 Flink 作業(yè)同時(shí)都可以訪(fǎng)問(wèn),但是需要文件是 local 的。但是在 k8s 環(huán)境中,基于文件反而變的比較簡(jiǎn)單,我們可以將 ConfigMap 掛載到 Prometheus Server 的 Pod 上面,F(xiàn)link 作業(yè)修改 ConfigMap 就可以了。
基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制簡(jiǎn)單來(lái)說(shuō)就是 label select。可以參考

  1. https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config 

關(guān)于 Prometheus 支持的更多服務(wù)發(fā)現(xiàn)機(jī)制,可以參考:https://prometheus.io/docs/prometheus/latest/configuration/configuration/ ,簡(jiǎn)單羅列包括:

azure
consul
digitalocean
docker
dockerswarm
dns
ec2
eureka
file
gce
hetzner
http
kubernetes
...
Spark

以批計(jì)算為代表的 Spark 使用 PushGateway 的方式來(lái)對(duì)接 Prometheus 是比較好的方式,但是 Spark 官方并沒(méi)有提供對(duì) PushGateway 的支持,只支持了 Prometheus 的 Exporter,需要 Prometheus Server 主動(dòng)去 pull 數(shù)據(jù)。

這里推薦使用基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。

需要注意的是 Prometheus Server 拉取指標(biāo)是按固定時(shí)間間隔進(jìn)行拉取的,對(duì)于持續(xù)時(shí)間比較短的批作業(yè),有可能存在還沒(méi)有拉取指標(biāo),作業(yè)就結(jié)束的情況。

8. 缺陷

雖然 Spark 和 Flink 都實(shí)現(xiàn)了 native k8s 的模式,具體實(shí)現(xiàn)略有差異。但是在實(shí)際使用上發(fā)現(xiàn)兩者的實(shí)現(xiàn)在某些場(chǎng)景下還是略有缺陷的。

Spark

pod 不具有容錯(cuò)性 spark-submit 會(huì)先構(gòu)建一個(gè) k8s 的 driver pod,然后由 driver pod 啟動(dòng) executor 的 pod。但是在 k8s 環(huán)境中并不太建議直接構(gòu)建 pod 資源,因?yàn)?pod 不具有容錯(cuò)性,pod 所在節(jié)點(diǎn)掛了之后 pod 就掛了。熟悉 k8s scheduler 的同學(xué)應(yīng)該知道 pod 有一個(gè)字段叫 podName,scheduler 的核心是為 pod 填充這個(gè)字段,也就是為 pod 選擇一個(gè)合適的 node。一旦調(diào)度完成之后 pod 的該字段就固定下來(lái)了。這也是 pod 不具有 node 容錯(cuò)的原因。

Flink

Deployment 語(yǔ)義。 Deployment 可以認(rèn)為是 ReplicaSet 的增強(qiáng)版,而 ReplicaSet 的官方定義如下。

A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

簡(jiǎn)單來(lái)說(shuō),ReplicaSet 的目的是保證幾個(gè)相同的 Pod 副本可以不間斷的運(yùn)行,說(shuō)是為了線(xiàn)上服務(wù)量身定制的也不為過(guò)(線(xiàn)上服務(wù)最好是無(wú)狀態(tài)且支持原地重啟,比如 WebService)。但是盡管 Flink 以流式作業(yè)為主,但是我們并不能簡(jiǎn)單地將流式作業(yè)等同于無(wú)狀態(tài)的 WebService。比如 Flink 作業(yè)的 Main Jar 如果寫(xiě)的有問(wèn)題,會(huì)導(dǎo)致 JobManager 的 Pod 一直啟動(dòng)失敗,但是由于是 Deployment 語(yǔ)義的問(wèn)題會(huì)不斷被重啟。這個(gè)可能是 ByDesign 的,但是感覺(jué)并不太好。

Batch 作業(yè)處理。 由于 Flink 作業(yè)運(yùn)行完所有資源包括 Deployment 都會(huì)被清理掉,拿不到最終的作業(yè)狀態(tài),不知道成功有否(流作業(yè)的話(huà)停止就可以認(rèn)為是失敗了)。對(duì)于這個(gè)問(wèn)題可以利用 Flink 本身的歸檔功能,將結(jié)果歸檔到外部的文件系統(tǒng)(兼容 s3 協(xié)議,比如阿里云對(duì)象存儲(chǔ) oss)中。涉及到的配置如下:

s3.access-key
s3.secret-key
s3.region
s3.endpoint
jobmanager.archive.fs.dir
如果不想引入外部系統(tǒng)的話(huà),需要改造 Flink 代碼在作業(yè)運(yùn)行完成之后將數(shù)據(jù)寫(xiě)到 k8s 的 api object 中,比如 ConfigMap 或者 Secret。

作業(yè)日志。 Spark 作業(yè)運(yùn)行結(jié)束之后 Executor Pod 被清理掉,Driver Pod 被保留,我們可以通過(guò)它查看到 Driver 的日志。Flink 作業(yè)結(jié)束之后就什么日志都查看不到了。

9. 總結(jié)

本文從使用方式、源碼實(shí)現(xiàn)以及在生產(chǎn)系統(tǒng)上面如何補(bǔ)足周邊系統(tǒng)地介紹了 Spark 和 Flink 在 k8s 生態(tài)上的實(shí)現(xiàn)、實(shí)踐以及對(duì)比。但是限于篇幅,很多內(nèi)容來(lái)不及討論了,比如 shuffle 如何處理。如果你們公司也在做這方面的工作,相信還是有很多參考價(jià)值的,也歡迎留言交流。

另外,YARN 的時(shí)代已經(jīng)過(guò)去了,以后 on k8s scheduler 將成為大數(shù)據(jù)計(jì)算以及 AI 框架的標(biāo)配。但是 k8s scheduler 這種天生為在線(xiàn)服務(wù)設(shè)計(jì)的調(diào)度器在吞吐上面有很大的不足,并不是很契合大數(shù)據(jù)作業(yè)。k8s 社區(qū)的批調(diào)度器 kube-batch,以及基于 kube-batch 衍生出來(lái)的 Volcano 調(diào)度器,基于 YARN 的調(diào)度算法實(shí)現(xiàn)的 k8s 生態(tài)調(diào)度器 Yunikorn 也逐漸在大數(shù)據(jù) on k8s 場(chǎng)景下嶄露頭角,不過(guò)這些都是后話(huà)了,后面有時(shí)間再專(zhuān)門(mén)寫(xiě)文章進(jìn)行分析對(duì)比。

責(zé)任編輯:梁菲 來(lái)源: 阿里云云棲號(hào)
相關(guān)推薦

2021-11-29 08:48:00

K8S KubernetesAirflow

2022-04-02 09:57:51

技術(shù)京東實(shí)踐

2023-12-25 07:35:40

數(shù)據(jù)集成FlinkK8s

2022-08-15 14:56:30

搜索引擎分布式

2022-08-21 07:25:09

Flink云原生K8S

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2019-06-27 09:12:43

FlinkStorm框架

2024-03-01 09:53:34

2017-09-01 05:35:58

分布式計(jì)算存儲(chǔ)

2021-07-05 09:28:11

Flink分布式程序

2013-09-11 16:02:00

Spark分布式計(jì)算系統(tǒng)

2022-10-10 12:54:00

Flink運(yùn)維

2010-06-03 19:46:44

Hadoop

2020-06-02 14:45:48

PostgreSQL架構(gòu)分布式

2009-06-19 14:23:41

RMIJava分布式計(jì)算

2021-10-30 19:30:23

分布式Celery隊(duì)列

2023-02-28 07:01:11

分布式緩存平臺(tái)

2018-10-16 14:26:22

分布式塊存儲(chǔ)引擎

2024-09-27 09:19:30

2022-03-21 19:44:30

CitusPostgreSQ執(zhí)行器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

国产精品乱码久久久久久 | 精品国产免费久久久久久尖叫| 久热这里有精品| 美女一区二区在线观看| 在线观看成人小视频| 一本色道久久综合亚洲精品婷婷 | 久久99国产精品久久99| 欧美福利在线观看| 在线观看国产精品一区| 美女精品久久| 在线看国产一区| 成人av在线播放观看| 国产乱视频在线观看| 国产精品影视在线| 国产精品久久久久av| 久久97人妻无码一区二区三区| 香蕉久久夜色精品国产使用方法| 欧美日韩久久不卡| 一二三四视频社区在线| 二区三区在线观看| 91视频com| 91九色蝌蚪嫩草| 日韩av免费播放| 一二三区精品| 欧美成人亚洲成人| 精品一区二区三区蜜桃在线| 波多野结衣欧美| 欧美性色综合网| 一女被多男玩喷潮视频| 怡红院在线播放| 国产精品麻豆网站| 日本一区免费| 五月婷婷综合久久| 国产成人精品一区二区三区网站观看| 国产精品成人在线| www.com国产| 亚洲福利国产| 久久久久久久久久久av| 无码黑人精品一区二区| 成人在线国产| 国产一区二区黑人欧美xxxx| 黄色av网址在线观看| 亚洲欧洲国产精品一区| 91精品免费在线观看| 三级a三级三级三级a十八发禁止| 亚洲天堂一区二区| 色综合天天性综合| 欧美色图另类小说| 中文在线中文资源| 一本色道久久综合亚洲91| 久久精品视频16| 超碰在线网站| 五月婷婷久久综合| 性欧美大战久久久久久久| 视频在线这里都是精品| 亚洲麻豆国产自偷在线| 日本三级中文字幕在线观看| 超碰公开在线| 亚洲激情在线播放| 免费高清一区二区三区| 91九色美女在线视频| 一区二区在线观看视频| www.成年人视频| 91探花在线观看| 欧美性xxxxxx| 亚洲色图38p| 日韩成人精品一区二区三区| 欧美精选一区二区| 在线观看视频在线观看| 亚洲一级大片| 亚洲福利视频网站| 麻豆国产精品一区| 天海翼亚洲一区二区三区| 日韩精品在线第一页| 国产伦精品一区二区三区妓女| 日韩大片在线免费观看| 亚洲欧美日韩天堂| 国产又粗又长免费视频| 亚洲男女av一区二区| 欧美日韩国产成人在线观看| 国产一级片免费观看| 国产精品日本| 国产精品成人品| 国产区精品在线| 成人免费视频一区| 欧美日韩精品一区| 伦xxxx在线| 亚洲国产精品一区二区久久恐怖片 | 日本黄色三级视频| 久久综合999| 在线视频精品一区| 欧美xxxx少妇| 日本韩国一区二区三区| 亚洲欧美天堂在线| 国产精品jk白丝蜜臀av小说| 亚洲免费视频观看| 欧美手机在线观看| 国产一区二区高清| 国产精品日韩欧美综合| 亚洲精品国产一区二| 91视视频在线观看入口直接观看www | 亚洲国产日产av| 任你操这里只有精品| 亚洲一区二区小说| 国产视频精品在线| 日本青青草视频| 老**午夜毛片一区二区三区| 亚洲一区二区三区久久| 日本私人网站在线观看| 亚洲男同性恋视频| 狠狠热免费视频| 欧美18xxxx| 久久亚洲影音av资源网| 亚洲va在线观看| 国产成人精品一区二区三区四区 | 成人国产一区二区| 国产一区电影| 亚洲1区2区3区4区| 国产成人美女视频| 精品99久久| 午夜精品一区二区三区在线播放| 96亚洲精品久久久蜜桃| 久久亚洲一区二区三区四区| 少妇久久久久久被弄到高潮| yiren22亚洲综合| 欧美精品一区二区蜜臀亚洲| 亚洲一级二级片| 轻轻草成人在线| 久久精品日产第一区二区三区精品版| 精品孕妇一区二区三区| 色婷婷综合五月| 日本黄色免费观看| 欧美亚洲不卡| 91精品免费视频| 成人免费在线电影| 色婷婷久久久久swag精品| 午夜视频在线观看国产| 欧美三级第一页| 成人精品福利视频| 在线观看h片| 欧美曰成人黄网| 一级性生活毛片| 亚洲综合不卡| 国内精品一区二区| gogo高清在线播放免费| 精品国产伦一区二区三区观看方式 | 午夜免费久久看| 一起操在线视频| 日韩久久综合| 国产欧美日韩精品专区| 91porn在线观看| 色哟哟欧美精品| 日韩毛片无码永久免费看| 日本成人中文字幕在线视频| 欧美一区二区三区在线免费观看| 蜜桃视频在线观看免费视频| 日韩av在线高清| 久久久国产高清| 91社区在线播放| 日本久久精品一区二区| 久久国产影院| 96pao国产成视频永久免费| av网站大全在线| 精品免费99久久| 日韩精品一区二区三| 26uuu国产日韩综合| 亚洲人成无码www久久久| 精品国产一区二区三区噜噜噜| 国产精品视频精品| 成人在线视频亚洲| 亚洲高清免费观看高清完整版| 你懂的国产视频| 久久精品视频在线免费观看| 天天视频天天爽| 欧美一区国产在线| 精品一区二区三区日本| 秋霞国产精品| 欧美精品一区三区| 亚洲 精品 综合 精品 自拍| 日本丶国产丶欧美色综合| 男人天堂资源网| 成人免费视频免费观看| 国产二级片在线观看| 色琪琪久久se色| 国产成人精品福利一区二区三区| 在线观看特色大片免费视频| 伊人久久五月天| 丰满人妻一区二区三区免费| 日韩欧美国产高清91| av在线免费播放网址| 懂色av一区二区在线播放| 久久婷婷五月综合色国产香蕉| 成人激情在线| 国产一区二区三区av在线| 成人自拍视频网| 欧美极品少妇xxxxⅹ裸体艺术 | 色噜噜狠狠狠综合曰曰曰88av| av网站免费大全| 日韩欧美国产视频| 永久久久久久久| 久久久精品免费免费| 中文字幕1234区| 模特精品在线| 特大黑人娇小亚洲女mp4| 一区二区三区日本久久久| 91色在线视频| 日韩和的一区二在线| 欧美激情日韩图片| 天天影视久久综合| 亚洲毛茸茸少妇高潮呻吟| 国产高清视频免费| 欧美日韩一区二区欧美激情| 天天操天天爽天天干| 一区二区三区日韩精品视频| 国产日产在线观看| 久久先锋资源网| 秘密基地免费观看完整版中文| 美女国产一区二区| 国产成人无码av在线播放dvd| 欧美日韩影院| 黄黄视频在线观看| 日韩理论电影| 日韩电影大全在线观看| 日韩电影不卡一区| 国产精品综合久久久久久| va天堂va亚洲va影视| 国产精品电影久久久久电影网| 第一中文字幕在线| 欧美另类高清videos| 男人天堂手机在线| 色777狠狠综合秋免鲁丝| 酒色婷婷桃色成人免费av网| 亚洲精品久久久久久下一站 | 亚洲美女视频网站| 全部免费毛片在线播放一个| 欧美一级黄色大片| av一区二区三| 欧美精三区欧美精三区| 在线观看国产精品视频| 在线视频一区二区免费| 亚洲 欧美 日韩 在线| 色综合视频一区二区三区高清| 日韩免费在线视频观看| 亚洲高清视频在线| 日本中文字幕免费| 午夜电影网一区| 免费黄色网址在线| 欧美午夜电影在线| 亚洲国产精品无码久久久| 一本到三区不卡视频| 无码人妻丰满熟妇精品区| 91福利精品视频| 中文字幕一区二区人妻| 欧美日韩国产高清一区二区三区| 中文字幕av在线免费观看| 欧美日韩高清在线播放| 国产乱淫a∨片免费观看| 欧美丰满少妇xxxbbb| 国产视频在线免费观看| 精品久久久网站| 五月婷婷在线观看视频| 亚洲视频自拍偷拍| 亚洲xxxxxx| 欧美精品在线观看91| 国产精品69xx| 51午夜精品视频| 欧美aaa大片视频一二区| 国产日本欧美在线观看| 国产日韩欧美中文在线| 国产激情一区二区三区在线观看 | 亚洲欧美国产不卡| 天天av综合| 久久亚洲国产成人精品无码区| 亚洲国产免费| www.超碰com| 国产高清久久久| 制服丝袜第二页| 国产精品日韩成人| 久久久精品国产sm调教| 精品久久在线播放| 国产又粗又猛视频| 亚洲国产精彩中文乱码av在线播放| 欧洲伦理片一区 二区 三区| 中日韩美女免费视频网站在线观看| 国产婷婷视频在线| 欧美一级淫片aaaaaaa视频| 黄色日韩网站| 国产在线精品一区二区三区》 | av影院午夜一区| 九九九视频在线观看| 亚洲午夜在线电影| 天天干天天插天天射| 日韩一区二区三免费高清| 亚洲aaaaaaa| 久久视频在线播放| 亚洲第一av| 5g国产欧美日韩视频| 亚洲激情77| av 日韩 人妻 黑人 综合 无码| 国产精品一国产精品k频道56| 在线观看岛国av| 91美女精品福利| 久久精品黄色片| 欧美中文字幕一区二区三区亚洲| 性生活免费网站| 在线成人激情视频| 国产精品一区二区日韩| 91精品中文在线| 国产麻豆一区二区三区精品视频| 波多野结衣与黑人| 日本aⅴ免费视频一区二区三区 | 久久久久久久999精品视频| 秋霞国产精品| 久久国产日韩欧美| 韩国在线一区| 日日夜夜精品视频免费观看| 久久久99精品久久| 一级免费在线观看| 精品人在线二区三区| 老司机福利在线视频| 国产国语刺激对白av不卡| 男人的天堂久久| 99国产精品白浆在线观看免费| 久久精品免费观看| www.中文字幕av| 精品久久久久久久久久久久久| wwwxxxx国产| 久久综合国产精品台湾中文娱乐网| 婷婷六月国产精品久久不卡| 精品国产乱码一区二区三区四区| 欧美a级在线| 亚洲在线观看网站| 国产精品成人免费在线| 亚洲精品久久久久久久蜜桃| 精品偷拍一区二区三区在线看| 91九色porn在线资源| 成人综合电影| 欧美午夜影院| 国产原创剧情av| 亚洲成av人片一区二区三区| 亚洲黄色小说网址| 欧美激情二区三区| 成人三级av在线| 日本网站免费在线观看| 波多野结衣在线aⅴ中文字幕不卡| 久久激情免费视频| 精品国产一区二区亚洲人成毛片| 欧美色图天堂| 国产一级特黄a大片99| 国产欧美日韩亚洲一区二区三区| 日本少妇xxxx| 黑人巨大精品欧美一区二区免费| 四虎在线免费看| 日韩av第一页| 欧美一级精品| 亚洲精品第三页| 一区二区成人在线| 视频一区 中文字幕| 91精品成人久久| 视频精品在线观看| 少妇一级淫免费放| 亚洲免费观看高清完整版在线观看| hs视频在线观看| 国内伊人久久久久久网站视频 | 日韩字幕在线观看| 国产午夜精品久久久| 欧美黄色三级| 熟女视频一区二区三区| 成人精品在线视频观看| 免费看日批视频| 日韩小视频在线| 福利电影一区 | 国产一区不卡精品| 久久精品视频久久| 亚洲精品在线91| 亚洲成人毛片| 成年女人18级毛片毛片免费| 久久先锋影音av鲁色资源 | 亚洲高清久久久久久| 欧美日韩不卡| 亚洲色婷婷久久精品av蜜桃| 99久久99久久免费精品蜜臀| 波多野结衣一区二区三区在线 | 日韩色淫视频| 无码毛片aaa在线| 99国产欧美久久久精品| 日本欧美www| 久久久久久美女| 欧美日韩国产一区二区三区不卡 | 国产精品国产三级国产三级人妇 | 国产一区二区在线视频播放| 中文字幕欧美日本乱码一线二线| 国产男女无套免费网站| 性色av一区二区三区免费| 欧洲乱码伦视频免费| 天天躁日日躁狠狠躁av| 欧美日韩另类一区| 末成年女av片一区二区下载| 日本黄色播放器| 久久精品亚洲国产奇米99 |