精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink執(zhí)行流程與源碼分析

大數(shù)據(jù)
整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象。現(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

[[422512]]

本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)左右手」,作者王了個博。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)左右手公眾號。

Flink主要組件

作業(yè)管理器(JobManager)

(1) 控制一個應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說,每個應(yīng)用程序 都會被一個不同的Jobmanager所控制執(zhí)行

(2) Jobmanager會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖( Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。

(3) Jobmanager會把 Jobgraph轉(zhuǎn)換成一個物理層面的 數(shù)據(jù)流圖,這個圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會向資源管理器( Resourcemanager)請求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運(yùn)行它們的 Taskmanager上。而在運(yùn)行過程中Jobmanagera會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(diǎn)(checkpoints)的協(xié)調(diào)。

任務(wù)管理器(Taskmanager)

(1) Flink中的工作進(jìn)程。通常在 Flink中會有多個 Taskmanageria運(yùn)行, 每個 Taskmanageri都包含了一定數(shù)量的插槽( slots)。插槽的數(shù)量限制了Taskmanageri能夠執(zhí)行的任務(wù)數(shù)量。

(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調(diào)用。Jobmanager就可以向插槽分配任務(wù)( tasks)來執(zhí)行了。

(3) 在執(zhí)行過程中, 一個 Taskmanagera可以跟其它運(yùn)行同一應(yīng)用程序的Taskmanager交換數(shù)據(jù)。

資源管理器(Resource Manager)

(1) 主要負(fù)責(zé)管理任務(wù)管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。

(2) Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 當(dāng) Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發(fā)起會話,以提供啟動 Taskmanager進(jìn)程的容器。

分發(fā)器(Dispatcher)

(1) 可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。

(2)當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給Jobmanage

(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。

任務(wù)提交流程

  1. 提交應(yīng)用
  2. 啟動并提交應(yīng)用
  3. 請求slots
  4. 任務(wù)啟動
  5. 注冊slots
  6. 發(fā)出提供slot的指令
  7. 提供slots
  8. 提交要在slots中執(zhí)行的任務(wù)
  9. 交換數(shù)據(jù)

任務(wù)提交流程(YARN)

a. Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置

b. 隨后向 Yarn ResourceManager提交任務(wù)ResourceManager分配 Container資源并通知對應(yīng)的NodeManager啟動

c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境

d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager

e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動TaskManager

f. NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動 TaskManager

g. TaskManager 啟動后向 JobManager 發(fā)送心跳包,并等待 JobManager 向其分配任務(wù)。

源碼分析--集群啟動 JobManager 啟動分析

JobManager 的內(nèi)部包含非常重要的三大組件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,啟動主類:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各種服務(wù) 
  7.  /** 
  8.   * 初始化了 主節(jié)點(diǎn)對外提供服務(wù)的時候所需要的 三大核心組件啟動時所需要的基礎(chǔ)服務(wù) 
  9.   *  初始化服務(wù),如 JobManager 的 Akka RPC 服務(wù),HA 服務(wù),心跳檢查服務(wù),metric service 
  10.   *  這些服務(wù)都是 Master 節(jié)點(diǎn)要使用到的一些服務(wù) 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 實(shí)現(xiàn)。RPC 服務(wù)啟動 Akka 參與者來接收從 RpcGateway 調(diào)用 RPC 
  12.   *  2、haServices:    提供對高可用性所需的所有服務(wù)的訪問注冊,分布式計數(shù)器和領(lǐng)導(dǎo)人選舉 
  13.   *  3、blobServer:    負(fù)責(zé)偵聽傳入的請求生成線程來處理這些請求。它還負(fù)責(zé)創(chuàng)建要存儲的目錄結(jié)構(gòu) blob 或臨時緩存它們 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服務(wù)。這包括創(chuàng)建心跳接收器和心跳發(fā)送者。 
  15.   *  5、metricRegistry:   跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存儲執(zhí)行圖ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的 
  21. 工廠實(shí)例 
  22. // 其實(shí)內(nèi)部包含了三個重要的成員變量: 
  23. // 創(chuàng)建 ResourceManager 的工廠實(shí)例 
  24. // 創(chuàng)建 Dispatcher 的工廠實(shí)例 
  25. // 創(chuàng)建 WebMonitorEndpoint 的工廠實(shí)例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 創(chuàng)建 集群運(yùn)行需要的一些組件:Dispatcher, ResourceManager 等 
  29. // 創(chuàng) 建 ResourceManager 
  30. // 創(chuàng) 建 Dispatcher 
  31. // 創(chuàng) 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各種服務(wù)

  1. // 初 始 化 和 啟 動 AkkaRpcService, 內(nèi) 部 其 實(shí) 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一個負(fù)責(zé) IO 的線程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服務(wù)組件,負(fù)責(zé) HA 服務(wù)的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服務(wù)端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一個用來存儲 ExecutionGraph 的 Store, 實(shí)現(xiàn)是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實(shí)例

  1. 1、DispatcherRunnerFactory,默認(rèn)實(shí)現(xiàn):DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默認(rèn)實(shí)現(xiàn):StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默認(rèn)實(shí)現(xiàn):SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 創(chuàng)建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  創(chuàng)建 WebMonitorEndpoint 實(shí)例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  當(dāng)前這個 DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的過程中,會一大堆的 Handler 
  8.   *  2、啟動一個 Netty 的服務(wù)端,綁定了這些 Handler 
  9.   *  3、當(dāng) client 通過 flink 命令執(zhí)行了某些操作(發(fā)起 restful 請求), 服務(wù)端由 webMonitorEndpoint 來執(zhí)行處理 
  10.   *  4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執(zhí)行處理 
  11.   *  5、補(bǔ)充一個:job 由 JobSubmitHandler 執(zhí)行完畢之后,轉(zhuǎn)交給 Dispatcher 去調(diào)度執(zhí)行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 創(chuàng)建 resourceManager

  1. /************************************************* 
  2.  *  創(chuàng)建 StandaloneResourceManager 實(shí)例對象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注釋: 得到一個 StandaloneResourceManager 實(shí)例對象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt請求 
  3. sendSlotReport(..): 將solt請求發(fā)送TaskManager 
  4. registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應(yīng)用程序 
  5. registerTaskExecutor(...): 注冊task執(zhí)行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注釋: 當(dāng)執(zhí)行完畢這個構(gòu)造方法的時候,會觸發(fā)調(diào)用 onStart() 方法執(zhí)行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實(shí)現(xiàn) 
  7.   *  以 ResourceManager 為例說明: 
  8.   *  啟動 ResourceManager 的 RPCServer 服務(wù) 
  9.   *  這里啟動的是 ResourceManager 的 Rpc 服務(wù)端。 
  10.   *  接收 TaskManager 啟動好了而之后, 進(jìn)行注冊和心跳,來匯報 Taskmanagaer 的資源情況 
  11.   *  通過動態(tài)代理的形式構(gòu)建了一個Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在創(chuàng)建resourceManager同級:啟動任務(wù)接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  創(chuàng)建 并啟動 Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一個參數(shù):ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 這個地方是直接創(chuàng)建一個 Dispatcher 對象然后調(diào)用 dispatcher.start() 來啟動 
  9.  *  新版本: 直接創(chuàng)建一個 DispatcherRunner, 內(nèi)部就是要創(chuàng)建和啟動 Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是對 Dispatcher 的封裝。 
  12.  *  DispatcherRunner被創(chuàng)建的代碼的內(nèi)部,會創(chuàng)建 Dispatcher并啟動 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注釋: 注意第三個參數(shù) 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 啟動后,將會等待任務(wù)提交,如果有任務(wù)提交,則會經(jīng)過submitJob(...)函數(shù)進(jìn)入后續(xù)處理。

提交(一個Flink應(yīng)用的提交必須經(jīng)過三個graph的轉(zhuǎn)換)

首先看下一些名詞

StreamGraph

是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。可以用一個 DAG 來表示),DAG 的頂點(diǎn)是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。

  • StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
  • StreamEdge:表示連接兩個StreamNode的邊。

DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構(gòu)造出一棵 StreamTransformation 樹,通過這棵樹轉(zhuǎn)換成 StreamGraph

以map方法為例,看看源碼

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通過java reflection抽出mapper的返回值類型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實(shí)現(xiàn)類 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation會連接上當(dāng)前DataStream中的transformation,從而構(gòu)建成一棵樹 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都會存到 env 中,調(diào)用execute時遍歷該list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map轉(zhuǎn)換將用戶自定義的函數(shù)MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當(dāng)調(diào)用env.execute時,遍歷其中的transformation集合構(gòu)造出StreamGraph

JobGraph

(1) StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點(diǎn) chain 在一起作為一個節(jié)點(diǎn)。

  • 將并不涉及到 shuffle 的算子進(jìn)行合并。
  • 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執(zhí)行。
  • 對于不在同一個 operator chain 里的算子,會在不同的 task 中執(zhí)行。

(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(diǎn)(JobVertex)、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。

(3) JobGraph 定義作業(yè)級別的配置,而每個頂點(diǎn)和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設(shè)置。

JobVertex

JobVertex 相當(dāng)于是 JobGraph 的頂點(diǎn)。經(jīng)過優(yōu)化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

IntermediateDataSet

JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。

JobEdge

job graph中的一條數(shù)據(jù)傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。

(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。

(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉(zhuǎn)換都是在客戶端完成。

(3)最后會將JobGraph轉(zhuǎn)換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。

ExecutionJobVertex

ExecutionJobVertex一一對應(yīng)JobGraph中的JobVertex

ExecutionVertex

一個ExecutionJobVertex對應(yīng)n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務(wù)的一個子任務(wù)

Execution

Execution 是對 ExecutionVertex 的一次執(zhí)行,通過 ExecutionAttemptId 來唯一標(biāo)識。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應(yīng)的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產(chǎn)者,每個生產(chǎn)者的在相應(yīng)的 IntermediateResult 上的輸出對應(yīng)一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區(qū)

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進(jìn)而在不同的 ExecutionVertex 之間建立聯(lián)系。

ExecutionGraph的構(gòu)建

  1. 構(gòu)建JobInformation
  2. 構(gòu)建ExecutionGraph
  3. 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 構(gòu)建JobInformation 
  7.    
  8.   // 構(gòu)建ExecutionGraph 
  9.    
  10.   // 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

構(gòu)建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 構(gòu)建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 連接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分計劃(可執(zhí)行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一個并行的ExecutionVertex節(jié)點(diǎn)都會鏈接到源節(jié)點(diǎn)產(chǎn)生的所有中間結(jié)果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并發(fā)數(shù)等于partition數(shù),則一對一進(jìn)行連接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并發(fā)數(shù)大于partition數(shù),則一對多進(jìn)行連接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并發(fā)數(shù)小于partition數(shù),則多對一進(jìn)行連接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

返回ExecutionGraph

TaskManager

TaskManager啟動

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //調(diào)用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接著的重要函數(shù)是shumitTask()函數(shù),該函數(shù)會通過AKKA機(jī)制,向TaskManager發(fā)出一個submitTask的消息請求,TaskManager收到消息請求后,會執(zhí)行submitTask()方法。(省略了部分代碼)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一個Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //啟動任務(wù) 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后創(chuàng)建執(zhí)行Task的線程,然后調(diào)用startTaskThread()來啟動具體的執(zhí)行線程,Task線程內(nèi)部的run()方法承載了被執(zhí)行的核心邏輯。

Task是執(zhí)行在TaskExecutor進(jìn)程里的一個線程,下面來看看其run方法

(1) 檢測當(dāng)前狀態(tài),正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態(tài)將拋異常。

(2) 讀取DistributedCache文件。

(3) 啟動ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注冊partitionWriter。

(5) 根據(jù)nameOfInvokableClass加載對應(yīng)的類并實(shí)例化。

(6) 將狀態(tài)置為RUNNING并執(zhí)行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

總結(jié)

整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象。現(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

本篇的主題是"Flink架構(gòu)與執(zhí)行流程",做下小結(jié),F(xiàn)link on Yarn的提交執(zhí)行流程:

1 Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置。

2 向Yarn ResourceManager提交任務(wù)。

3 ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster。

4 ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境。

5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。

6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動TaskManager。

7 NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager。

8 TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)左右手
相關(guān)推薦

2022-04-05 12:59:07

源碼線程onEvent

2022-08-27 08:02:09

SQL函數(shù)語法

2016-10-21 13:03:18

androidhandlerlooper

2012-08-30 09:48:02

Struts2Java

2024-07-15 09:58:03

OpenRestyNginx日志

2016-11-25 13:26:50

Flume架構(gòu)源碼

2016-11-29 09:38:06

Flume架構(gòu)核心組件

2016-11-25 13:14:50

Flume架構(gòu)源碼

2020-07-13 09:09:23

Sentinel源碼Bucket

2025-05-26 09:05:00

2022-06-07 10:33:29

Camera組件鴻蒙

2015-01-14 13:22:36

OpenStack創(chuàng)建快照glance api

2009-12-22 13:36:39

Linux Sysfs

2017-08-22 13:45:27

2009-07-08 10:30:57

WebWork

2022-07-15 08:52:03

Linux優(yōu)化

2024-10-21 10:45:52

2017-04-19 15:32:46

ReactRouter構(gòu)建源碼

2016-11-29 16:59:46

Flume架構(gòu)源碼

2011-03-15 11:33:18

iptables
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

www..com国产| 午夜剧场免费看| 国产原创精品视频| 国产成人精品综合在线观看| 国产+人+亚洲| 欧美 日韩 国产 成人 在线观看 | 精品二区在线观看| 国产日韩欧美高清免费| 日韩在线国产精品| 在线免费看黄色片| 久久久加勒比| 欧美日韩午夜剧场| 97超碰人人爱| 大地资源中文在线观看免费版| 国产一区二区成人久久免费影院| 91干在线观看| 欧美日韩精品亚洲精品| 国产一区网站| 亚洲成人激情在线| 欧美精品 - 色网| 在线精品亚洲欧美日韩国产| 亚洲精品欧美在线| 天堂av一区二区| 亚洲欧美色视频| 久久国产麻豆精品| 欧美有码在线观看视频| 久久久久久福利| 欧美高清视频手机在在线| 国产视频久久久久| 制服丝袜在线第一页| 色8久久久久| 在线观看免费成人| 日日碰狠狠添天天爽超碰97| 18videosex性欧美麻豆| 中文字幕免费一区| 农村寡妇一区二区三区| 亚洲国产999| 国产伦精品一区二区三区免费迷| 国产精品aaa| 特级西西444www大精品视频免费看| 亚洲一区色图| 最新国产精品亚洲| 人妻少妇无码精品视频区| 人妖一区二区三区| 亚洲第一中文字幕| av av在线| 国内精品国产成人国产三级粉色| 日韩欧美一区在线| 杨幂一区二区国产精品| 激情不卡一区二区三区视频在线| 欧美日韩国产免费| 手机在线国产视频| **日韩最新| 欧美精品 日韩| 午夜视频在线网站| 亚洲成人a级片| 欧美日韩国产a| 亚洲天堂av一区二区| 不卡亚洲精品| 欧美日韩国产高清一区| 国产又大又黄又粗又爽| 欧美久久久网站| 欧美日本韩国一区二区三区视频 | 欧美日韩中文视频| 99视频一区| 欧洲亚洲女同hd| 精品一区二区无码| 久久综合综合久久综合| 国产一区二区色| 国产叼嘿视频在线观看| 国产成人h网站| 国产精品一区二区三区不卡| 天天综合网天天综合| 99久久精品免费| 欧洲精品久久| jyzzz在线观看视频| 国产精品国产a级| 青青草影院在线观看| 久草在线资源站资源站| 午夜精品爽啪视频| www.xxx亚洲| 欧美影院视频| 日韩精品中文字幕久久臀| 天天躁日日躁aaaa视频| 99久久婷婷| 性欧美办公室18xxxxhd| 夜夜爽妓女8888视频免费观看| 久久99国产精品免费| 国产精成人品localhost| 你懂的在线播放| 国产精品国产三级国产aⅴ中文| 亚洲小视频在线播放| 免费的av网站| 国产1区2区3区在线| 国产精品天天摸av网| 日韩中文字幕在线不卡| 欧美日韩国产v| 欧美一区二区三级| 亚洲午夜福利在线观看| 亚洲免费二区| 欧美亚洲日本黄色| 99热这里只有精品在线观看| 91免费观看视频| 9999在线观看| 伊伊综合在线| 日韩欧美的一区| 尤物视频最新网址| 好吊一区二区三区| 国产精品久久久av久久久| 精品免费久久久| 国产精品五月天| aa在线观看视频| 精品视频一二| 一本色道久久88综合亚洲精品ⅰ| 青娱乐av在线| 久久超级碰视频| 欧美日韩国产综合在线| 牛牛精品在线视频| 777a∨成人精品桃花网| 中文字幕一区二区人妻在线不卡| 欧美freesex交免费视频| 国产精品免费久久久久久| 色婷婷av一区二区三区之e本道| 国产精品久久久久久久岛一牛影视 | 久热免费在线观看| 99re91这里只有精品| 日韩中文在线不卡| 最近日韩免费视频| 久久无码av三级| 无罩大乳的熟妇正在播放| 日韩中文字幕无砖| 久久天天躁狠狠躁夜夜av| 波多野结衣毛片| 久久久亚洲精品一区二区三区| 一级性生活视频| 欧美日韩破处视频| 最近2019中文免费高清视频观看www99 | 日韩成人午夜电影| 欧美日韩一区二区视频在线| 男人天堂视频在线观看| 精品国产一区二区三区四区四| 艳妇荡乳欲伦69影片| 麻豆9191精品国产| 久久天堂国产精品| 欧美a级在线观看| 精品第一国产综合精品aⅴ| 欧美日韩成人免费观看| 国产一区二区三区在线观看精品| 一区二区三区视频在线播放| 亚洲男女网站| 久久中国妇女中文字幕| 99久久一区二区| 一区二区三区久久久| 日本成人在线免费| 亚洲视频碰碰| 精品久久蜜桃| 成人性教育av免费网址| 亚洲天堂男人的天堂| 无码人妻丰满熟妇区bbbbxxxx| 91美女精品福利| 四季av一区二区| 99久久九九| 国产精品久久7| 欧美亚洲日本精品| 中文字幕亚洲欧美日韩2019| 一级淫片免费看| 亚洲精品国久久99热| 99免费观看视频| 国产精品一国产精品k频道56| 欧美精品中文字幕一区二区| xxxxx.日韩| 欧美日本国产在线| 天天射,天天干| 欧洲视频一区二区| 国产精品 欧美激情| 成人丝袜高跟foot| 日韩毛片在线免费看| 天天插综合网| 国产精品一区在线播放| 456亚洲精品成人影院| www.午夜精品| 深爱五月激情五月| 欧美日韩专区在线| 久久久久99精品成人片毛片| 久久久国产精品麻豆| 91插插插影院| 久久av在线| 中文字幕色一区二区| 久久综合另类图片小说| 国产精品丝袜白浆摸在线| 午夜dj在线观看高清视频完整版| 日韩国产精品亚洲а∨天堂免| 中文字幕在线网站| 亚洲成a天堂v人片| 久久久精品成人| 成人精品高清在线| 污色网站在线观看| 日韩午夜av在线| 在线一区高清| 影视先锋久久| 成人黄色片视频网站| 日本精品另类| 性色av一区二区三区免费 | 欧美大片第1页| 国产精品四虎| 亚洲国产美女久久久久| 国产一区二区三区成人| 欧美午夜精品久久久久久人妖 | eeuss影院在线播放| 亚洲精品一区二区三区福利| 一区二区三区免费在线| 欧美性xxxxx极品娇小| 激情综合五月网| 国产精品久久久久久久蜜臀| 日本黄色特级片| 丰满亚洲少妇av| 一区二区久久精品| 日韩—二三区免费观看av| av在线免费观看国产| 国产精品国产三级国产在线观看| 久久精品一区二区三区不卡免费视频| 国产精品美女久久久久| 国产精品久久久久不卡| 这里有精品可以观看| 久久噜噜噜精品国产亚洲综合| 免费在线观看av片| 在线观看欧美www| 理论在线观看| 精品在线小视频| 三级av在线播放| 亚洲国产精品va| 理论片中文字幕| 欧美α欧美αv大片| 国产黄a三级三级看三级| 欧美美女黄视频| 91精品人妻一区二区三区果冻| 91久久国产最好的精华液| 国产精品午夜影院| 欧美日韩中文字幕| 国产九色在线播放九色| 午夜伦理一区二区| 日本熟妇成熟毛茸茸| 亚洲综合色网站| 久热这里只有精品在线| 夜夜揉揉日日人人青青一国产精品| 九九精品视频免费| 亚洲乱码中文字幕| 任我爽在线视频| 亚洲少妇中出一区| 私库av在线播放| 亚洲已满18点击进入久久| 欧美成人片在线观看| 亚洲影视资源网| 黄网在线观看视频| 99精品在免费线偷拍| 亚洲精品无码久久久| 欧美一级国产精品| 亚洲第一色网站| 亚洲国产日韩精品在线| 蜜桃视频久久一区免费观看入口| 日韩精品一区二区三区中文不卡| 亚洲高清视频在线播放| 日韩av最新在线| 国产网站在线播放| 久久精品91久久久久久再现| 麻豆网在线观看| 欧美激情第一页xxx| 第一福利在线视频| 欧美资源在线观看| 成人国产在线| 91精品国自产在线观看| 99久久人爽人人添人人澡| 精品免费一区二区三区蜜桃| 久久不见久久见国语| 自拍偷拍99| 国内精品久久久久久久影视蜜臀| 欧美一区二区中文字幕| 日韩电影免费一区| 久久久久亚洲av片无码v| 成人高清在线视频| 熟女少妇内射日韩亚洲| 亚洲欧美激情视频在线观看一区二区三区| 青青草原免费观看| 色综合久久综合网| 国产乱淫a∨片免费观看| 精品国产亚洲在线| 懂色一区二区三区| 欧美激情图片区| 亚洲高清黄色| 丁香婷婷久久久综合精品国产| 色婷婷久久久| 黑人巨大国产9丨视频| 一区二区久久| 天美一区二区三区| 久久亚洲一区二区三区明星换脸 | 国模无码一区二区三区| 国产亚洲精品美女久久久| 91精品久久| 国产不卡在线观看| 亚洲精品视频一二三区| 欧洲精品亚洲精品| 亚洲乱码视频| 五月天中文字幕在线| 99re视频精品| 欧美成人综合色| 欧美三级欧美一级| 五月婷婷综合久久| 免费不卡在线观看av| 色成人免费网站| 久久久久久久久久久一区| 欧美韩日精品| 色乱码一区二区三区在线| 91日韩精品一区| 国产亚洲精品成人| 91麻豆精品国产无毒不卡在线观看| 日韩在线免费播放| 久久久亚洲国产| 精品国模一区二区三区欧美| 神马一区二区影院| 翔田千里一区二区| 欧美夫妇交换xxx| 一区二区三区av电影| 国产一区二区麻豆| 中文字幕亚洲一区二区三区| 成人美女视频| 精品婷婷色一区二区三区蜜桃| 欧美在线资源| 999热精品视频| 国产精品视频在线看| 无码人妻精品一区二区| 亚洲欧美成人网| 国产精品原创| 国产一级二级三级精品| 精品福利电影| 佐佐木明希电影| 亚洲一区二区美女| 亚洲精品无遮挡| 国内精品久久久久久| 国产精品x8x8一区二区| www.九色.com| 成人黄页在线观看| 日韩av在线天堂| 亚洲国产福利在线| 自拍网站在线观看| 久热国产精品视频一区二区三区| 136国产福利精品导航网址| 中文字幕乱视频| 精品久久久久久久大神国产| 开心激情综合网| 欧美在线免费观看| 久久不见久久见免费视频7| 午夜精品在线免费观看| 一区二区中文字幕在线| 国产精品羞羞答答在线| 另类美女黄大片| 中文无码日韩欧| 国产 福利 在线| 久久精品一二三| 中文字幕永久免费视频| 久久精品在线播放| 99ri日韩精品视频| 99热在线这里只有精品| 欧美激情一二三区| 国产乱淫a∨片免费视频| 欧美国产视频一区二区| 欧美日韩一本| 久久久久久久片| 国产精品进线69影院| 精品国产免费无码久久久| 97精品国产91久久久久久| 国产精品网站在线看| 欧美视频免费播放| 国产精品国产三级国产aⅴ原创| 国内精品久久久久久久久久| 97国产精品视频人人做人人爱| 国产成人影院| 一级 黄 色 片一| 午夜视频一区在线观看| av网在线观看| 99电影在线观看| 久久一区二区三区超碰国产精品| 四虎影视一区二区| 亚洲第一福利网站| 99九九久久| 美女扒开大腿让男人桶| 国产女人18毛片水真多成人如厕| 99久久精品日本一区二区免费| 韩国福利视频一区| 日韩久久精品| 中文字幕人妻一区二区三区| 欧美写真视频网站| 岛国片av在线| 一区二区三区四区视频在线| 波多野结衣亚洲一区| 一本色道久久综合熟妇| 97精品在线观看| 91精品天堂福利在线观看| 中文字幕在线看高清电影| 日韩久久久精品| 九色成人搞黄网站|