精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

為啥Spark 的Broadcast要用單例模式

大數據 Spark
很多用Spark Streaming 的朋友應該使用過broadcast,大多數情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?

很多用Spark Streaming 的朋友應該使用過broadcast,大多數情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?浪尖在這里幫大家分析一下,有以下幾個原因:

  1. 廣播變量大多數情況下是不會變更的,使用單例模式可以減少spark streaming每次job生成執行,重復生成廣播變量帶來的開銷。
  2. 單例模式也要做同步。這個對于很多新手來說可以不用考慮同步問題,原因很簡單因為新手不會調整spark 程序task的調度模式,而默認采用FIFO的調度模式,基本不會產生并發問題。1).假如你配置了Fair調度模式,同時修改了Spark Streaming運行的并行執行的job數,默認為1,那么就要加上同步代碼了。2).還有一個原因,在多輸出流的情況下共享broadcast,同時配置了Fair調度模式,也會產生并發問題。
  3. 注意。有些時候比如廣播配置文件,規則等需要變更broadcast,在使用fair的時候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。

先看例子,后面逐步揭曉內部機制。

1.例子

下面是一個雙重檢查式的broadcast變量的聲明方式。

  1. object WordBlacklist { 
  2.  
  3.   @volatile private var instance: Broadcast[Seq[String]] = null 
  4.  
  5.   def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
  6.     if (instance == null) { 
  7.       synchronized { 
  8.         if (instance == null) { 
  9.           val wordBlacklist = Seq("a""b""c"
  10.           instance = sc.broadcast(wordBlacklist) 
  11.         } 
  12.       } 
  13.     } 
  14.     instance 
  15.   } 

廣播變量的使用方法如下:

  1. val lines = ssc.socketTextStream(ip, port) 
  2.     val words = lines.flatMap(_.split(" ")) 
  3.     val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
  4.     wordCounts.foreachRDD { (rdd: RDD[(String, Int)], timeTime) => 
  5.       // Get or register the blacklist Broadcast 
  6.       val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
  7.       // Get or register the droppedWordsCounter Accumulator 
  8.       val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
  9.       // Use blacklist to drop words and use droppedWordsCounter to count them 
  10.       val counts = rdd.filter { case (word, count) => 
  11.         if (blacklist.value.contains(word)) { 
  12.           droppedWordsCounter.add(count
  13.           false 
  14.         } else { 
  15.           true 
  16.         } 
  17.       }.collect().mkString("["", ""]"
  18.       val output = s"Counts at time $time $counts" 
  19.       println(output
  20.       println(s"Dropped ${droppedWordsCounter.value} word(s) totally"
  21.       println(s"Appending to ${outputFile.getAbsolutePath}"
  22.       Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
  23.     } 

2.概念補充

為啥Spark 的Broadcast要用單例模式

首先,一個基本概念就是Spark應用程序從開始提交到task執行分了很多層。

  1. 應用調度器。主要是資源管理器,比如standalone,yarn等負責Spark整個應用的調度和集群資源的管理。
  2. job調度器。spark 的算子分為主要兩大類,transform和action,其中每一個action都會產生一個job。這個job需要在executor提供的資源池里調度執行,當然并不少直接調度執行job。
  3. stage劃分及調度。job具體會劃分為若干stage,這個就有一個基本的概念就是寬依賴和窄依賴,寬依賴就會劃分stage。stage也需要調度執行,從后往前劃分,從前往后調度執行。
  4. task切割及調度。stage往下繼續細化就是會根據不太的并行度劃分出task集合,這個就是在executor上調度執行的基本單元,目前的調度默認是一個task一個cpu。
  5. Spark Streaming 的job生成是周期性的。當前job的執行時間超過生成周期就會產生job 累加。累加一定數目的job后有可能會導致應用程序失敗。這個主要原因是由于FIFO的調度模式和Spark Streaming的默認單線程的job執行機制

3.Spark Streaming job生成

這個源碼主要入口是StreamingContext#JobScheduler#JobGenerator對象,內部有個RecurringTimer,主要負責按照批處理時間周期產生GenrateJobs事件,當然在存在windows的情況下,該周期有可能不會生成job,要取決于滑動間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
  2.    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

我們直接看其實現代碼塊:

  1. eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
  2.       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
  3.  
  4.       override protected def onError(e: Throwable): Unit = { 
  5.         jobScheduler.reportError("Error in job generator", e) 
  6.       } 
  7.     } 
  8.     eventLoop.start() 

event處理函數是processEvent方法

  1. /** Processes all events */ 
  2.   private def processEvent(event: JobGeneratorEvent) { 
  3.     logDebug("Got event " + event) 
  4.     event match { 
  5.       case GenerateJobs(time) => generateJobs(time
  6.       case ClearMetadata(time) => clearMetadata(time
  7.       case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.         doCheckpoint(time, clearCheckpointDataLater) 
  9.       case ClearCheckpointData(time) => clearCheckpointData(time
  10.     } 
  11.   } 

在接受到GenerateJob事件的時候,會執行generateJobs代碼,就是在該代碼內部產生和調度job的。

  1. /** Generate jobs and perform checkpointing for the given `time`.  */ 
  2.   private def generateJobs(timeTime) { 
  3.     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
  4.     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
  5.     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true"
  6.     Try { 
  7.       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
  8.       graph.generateJobs(time) // generate jobs using allocated block 
  9.     } match { 
  10.       case Success(jobs) => 
  11.         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time
  12.         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  13.       case Failure(e) => 
  14.         jobScheduler.reportError("Error generating jobs for time " + time, e) 
  15.         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
  16.     } 
  17.     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  18.   } 

可以看到代碼里首先會執行job生成代碼

  1. graph.generateJobs(time
  2.  
  3. 具體代碼塊兒 
  4.  
  5. def generateJobs(timeTime): Seq[Job] = { 
  6.     logDebug("Generating jobs for time " + time
  7.     val jobs = this.synchronized { 
  8.       outputStreams.flatMap { outputStream => 
  9.         val jobOption = outputStream.generateJob(time
  10.         jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
  11.         jobOption 
  12.       } 
  13.     } 
  14.     logDebug("Generated " + jobs.length + " jobs for time " + time
  15.     jobs 
  16.   } 

每個輸出流都會生成一個job,輸出流就類似于foreachrdd,print這些。其實內部都是ForEachDStream。所以生成的是一個job集合。

然后就會將job集合提交到線程池里去執行,這些都是在driver端完成的哦。

  1. jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  2.  
  3. 具體h函數內容 
  4. def submitJobSet(jobSet: JobSet) { 
  5.     if (jobSet.jobs.isEmpty) { 
  6.       logInfo("No jobs added for time " + jobSet.time
  7.     } else { 
  8.       listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
  9.       jobSets.put(jobSet.time, jobSet) 
  10.       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
  11.       logInfo("Added jobs for time " + jobSet.time
  12.     } 
  13.   } 

其實就是遍歷生成的job集合,然后提交到線程池jobExecutor內部執行。這個也是在driver端的哦。

jobExecutor就是一個固定線程數的線程池,默認是1個線程。

  1. private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  2.   private val jobExecutor = 
  3.     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor"

需要的話可以配置spark.streaming.concurrentJobs來同時提交執行多個job。

那么這種情況下,job就可以并行執行了嗎?

顯然不是的!

還要修改一下調度模式為Fair,詳細的配置可以參考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

簡單的均分的話只需要

  1. conf.set("spark.scheduler.mode""FAIR"

然后,同時運行的job就會均分所有executor提供的資源。

這就是整個job生成的整個過程了哦。

因為Spark Streaming的任務存在Fair模式下并發的情況,所以需要在使用單例模式生成broadcast的時候要注意聲明同步。

責任編輯:未麗燕 來源: Spark學習技巧
相關推薦

2021-09-07 10:44:35

異步單例模式

2021-02-01 10:01:58

設計模式 Java單例模式

2021-03-02 08:50:31

設計單例模式

2022-02-06 22:30:36

前端設計模式

2022-09-29 08:39:37

架構

2016-03-28 10:23:11

Android設計單例

2013-11-26 16:20:26

Android設計模式

2011-03-16 10:13:31

java單例模式

2021-02-07 23:58:10

單例模式對象

2022-06-07 08:55:04

Golang單例模式語言

2015-09-06 11:07:52

C++設計模式單例模式

2021-08-11 17:22:11

設計模式單例

2024-02-04 12:04:17

2024-03-06 13:19:19

工廠模式Python函數

2023-11-21 21:39:38

單例模式音頻管理器

2011-06-28 15:18:45

Qt 單例模式

2016-10-09 09:37:49

javascript單例模式

2013-03-26 10:35:47

Objective-C單例實現

2022-08-10 11:02:56

Python單例模式

2021-05-29 10:22:49

單例模式版本
點贊
收藏

51CTO技術棧公眾號

亚洲中文字幕久久精品无码喷水| 国产成人精品免费久久久久| www.污网站| 欧美bbbxxxxx| 久久久亚洲精品一区二区三区 | 91国内外精品自在线播放| 国产精品日日摸夜夜摸av| 91成人在线看| 一级片免费在线播放| 天天做天天爱天天综合网2021| 精品成人免费观看| 男人添女人下面免费视频| 日本中文字幕中出在线| 欧美国产视频在线| 久久久久五月天| 麻豆视频免费在线播放| 久久夜色电影| 日韩一区二区三区视频| 亚洲国产精品毛片av不卡在线| 亚洲第一图区| 欧美激情在线免费观看| 国产亚洲二区| 精品毛片一区二区三区| 日本视频在线一区| 91国内免费在线视频| 国产av 一区二区三区| 免费成人高清在线视频theav| 欧美一区午夜视频在线观看| 粗暴91大变态调教| 成人黄色在线免费观看| 天天操天天射天天爽| 91精品国产91久久久久久黑人| 日韩电影视频免费| 久久久无码人妻精品无码| 日韩成人免费av| 色婷婷国产精品| 成年人午夜视频在线观看 | 欧美国产日韩精品| 日韩亚洲欧美中文字幕| 欧美裸体在线版观看完整版| 亚洲精品久久久久久下一站| 一级黄色大片免费看| www.久久爱.com| 欧美日韩成人高清| 成人亚洲精品777777大片| 日韩脚交footjobhd| 香蕉加勒比综合久久| 欧美日韩福利在线| 性国产高清在线观看| 亚洲欧美日本韩国| 强伦女教师2:伦理在线观看| av午夜在线| 中文字幕av在线一区二区三区| 欧美一区二区视频17c| 全色精品综合影院| 久久麻豆一区二区| 欧美日韩三区四区| 精品视频二区| 国产欧美日产一区| 西游记1978| 色的视频在线免费看| 亚洲国产精品二十页| 亚洲看片网站| 青青影院在线观看| 综合在线观看色| 国风产精品一区二区| 亚洲www色| 精品国产乱码久久久久久婷婷| 3d动漫一区二区三区| 欧美日韩在线观看首页| 欧洲精品视频在线观看| 国产成人黄色网址| 国产精品美女久久久久人| 欧美一二三四区在线| 亚洲美女高潮久久久| 网红女主播少妇精品视频| 亚洲网站在线观看| 成人自拍小视频| 国产一区日韩欧美| 亲爱的老师9免费观看全集电视剧| 亚洲高清毛片一区二区| 日韩电影在线免费| 91沈先生在线观看| 人人妻人人澡人人爽久久av| 久久综合视频网| 正在播放亚洲| 国内激情视频在线观看| 在线亚洲一区二区| 少妇性l交大片7724com| 欧美一区二区三区红桃小说| 中文字幕欧美视频在线| 久久免费少妇高潮99精品| 免费亚洲网站| 91久久精品国产91久久性色| 天天舔天天干天天操| 日本一区二区久久| 国产欧美久久久久| 新片速递亚洲合集欧美合集| 欧美一级艳片视频免费观看| 精品久久久久久中文字幕人妻最新| 成人vr资源| 欧美精品国产精品日韩精品| 亚洲欧美日韩一区二区三区四区| 国产米奇在线777精品观看| 久久久com| 国内外激情在线| 日本道在线观看一区二区| 99国产精品免费视频| 美女av免费观看| 国产鲁鲁视频在线观看特色| 欧美日韩国产色| 97人人模人人爽人人澡| 国产欧美高清视频在线| 欧美贵妇videos办公室| 一级片免费观看视频| 91视频在线观看免费| 裸体裸乳免费看| 外国电影一区二区| 亚洲精品理论电影| 青娱乐国产在线视频| 免费视频一区二区| 看欧美日韩国产| 欧美hdxxxxx| 制服视频三区第一页精品| 亚洲av综合一区二区| 伊人久久成人| 99超碰麻豆| а√天堂资源地址在线下载| 精品视频999| 精品人伦一区二区三电影| 好吊视频一区二区三区四区| 成人午夜黄色影院| 91官网在线| 在线免费观看日本欧美| 欧美深性狂猛ⅹxxx深喉 | 日韩日本欧美亚洲| www.com亚洲| 久久久综合精品| 日韩欧美一区二| 麻豆成人入口| 亚洲欧美国产三级| 理论片在线不卡免费观看| 日韩在线播放中文字幕| 91天堂素人约啪| 乱妇乱女熟妇熟女网站| 久久成人福利| 98视频在线噜噜噜国产| 日本黄色大片视频| 亚洲a一区二区| 手机在线成人av| 激情亚洲网站| 国产中文一区二区| 91www在线| 日韩av在线直播| 国产成人精品片| 久久综合九色综合欧美98| 男人天堂1024| 亚洲制服欧美另类| 日韩av色综合| 国产美女性感在线观看懂色av| 欧美性猛交xxxx黑人猛交| 成人免费av片| 青草av.久久免费一区| 午夜精品一区二区三区四区| 美女视频一区| 美日韩精品免费观看视频| 国产女人爽到高潮a毛片| 亚洲精品国产精品乱码不99| 黄色av电影网站| 亚洲一区亚洲| 亚洲v日韩v欧美v综合| 国产精久久久| 国模精品视频一区二区| 青青草娱乐在线| 欧美日韩另类一区| 久久久一二三四| 国产熟女一区二区三区四区| 亚洲激情图片一区| 亚洲av成人片无码| 久久久久久一区二区| 亚洲三区四区| 波多野结衣在线一区二区 | 欧亚一区二区| 久久国产精品影视| 天天躁日日躁狠狠躁喷水| 欧美系列日韩一区| 久草国产在线观看| 久久综合九色综合欧美亚洲| 日本美女视频一区| 国产精品videossex久久发布| 鲁鲁狠狠狠7777一区二区| 国产精品伦一区二区| 欧美国产一区二区三区| 国产视频网站在线| 91麻豆精品国产91久久久资源速度| 久久久久久国产精品视频| 久久久久久久久久久久久女国产乱| 色啦啦av综合| 久久精品三级| 亚洲精品国产suv一区88| 欧美码中文字幕在线| 国产一区在线免费观看| 日韩护士脚交太爽了| 国产69精品久久久久久| 成年视频在线观看| 国产一区二区三区丝袜| 色婷婷av一区二区三区之红樱桃| 欧美日韩免费一区二区三区视频| 欧美视频在线第一页| 亚洲精品久久久蜜桃动漫| 亚洲精品欧美综合四区| 欧美偷拍一区二区三区| 成人精品一区二区三区中文字幕| 亚洲欧美自拍另类日韩| 亚洲欧美日本日韩| 亚洲精品国产suv一区88| 日韩成人免费| 免费h精品视频在线播放| 日韩三级久久| 91色视频在线导航| 日韩国产91| 国产精品视频在线观看| 色综合亚洲图丝熟| 欧美高清一级大片| gogo在线观看| 色七七影院综合| 成人在线观看黄色| 亚洲美腿欧美激情另类| 熟妇人妻系列aⅴ无码专区友真希| 欧美一区二区三区免费观看视频| 国产伦视频一区二区三区| 国产精品美女久久久久av爽| 中文字幕在线观看一区二区| 欧美特级黄色录像| 久久蜜臀精品av| 国产三级国产精品| 99re这里都是精品| 中文字幕精品久久久| 成人综合婷婷国产精品久久 | 欧美 国产 精品| 小说区亚洲自拍另类图片专区| 亚洲精品乱码视频| 久久精品国产www456c0m| 亚洲精品国产精品国自产观看| 精品99在线| 色视频一区二区三区| 成人综合久久| 正在播放精油久久| 国产精品久久观看| 国产91视频一区| 亚洲视频中文| 日韩激情免费视频| 久久久久久久高潮| 中文字幕在线观看第三页| 久久激情中文| 亚洲国产日韩欧美在线观看| 精品一区二区三区免费播放| www.污网站| 大陆成人av片| 欧美 日本 国产| 中文字幕免费在线观看视频一区| jizzjizz日本少妇| 亚洲精品成人精品456| 国产性生活网站| 午夜精品久久久久久久99樱桃| 男人的天堂一区二区| 日韩欧美在线网址| 在线播放一级片| 日韩午夜激情av| 午夜一区在线观看| 国产亚洲福利一区| 超碰个人在线| 69视频在线播放| 久久人体av| 99精品国产高清在线观看| 神马香蕉久久| 亚洲精品视频一二三| 欧美在线三区| av免费播放网址| 久草中文综合在线| 欧美日韩人妻精品一区在线| 久久精品一二三| 全程偷拍露脸中年夫妇| 日韩欧美一区二区在线| 一本色道久久综合亚洲| 精品sm在线观看| 在线观看免费版| 国内精品久久久久| av免费在线一区| 国产高清精品一区二区| 久草精品在线| 国产精品videossex国产高清| 亚洲日产国产精品| 日本高清久久久| yourporn久久国产精品| 男人av资源站| 色综合久久中文字幕综合网 | 亚洲精品国产精品国自产观看浪潮| 成年网站在线| 性欧美激情精品| 粉嫩av国产一区二区三区| 精品蜜桃一区二区三区| 午夜欧美在线| 又色又爽又高潮免费视频国产| 国产成人8x视频一区二区| 日韩欧美黄色网址| 天天色天天爱天天射综合| 国产精品天天操| 国产性猛交xxxx免费看久久| 9765激情中文在线| 91啪国产在线| 色88久久久久高潮综合影院| 国产成人无码精品久久久性色| 国产真实乱对白精彩久久| 国产中年熟女高潮大集合| 亚洲国产成人高清精品| 国产丝袜视频在线观看 | 日本理论片午伦夜理片在线观看| 国产精品jvid在线观看蜜臀 | 日韩精品视频免费在线观看| 在线中文免费视频| 成人高清视频观看www| blacked蜜桃精品一区| jizzjizzxxxx| 91亚洲精华国产精华精华液| 1024手机在线视频| 91精品中文字幕一区二区三区| av在线1区2区| 国产精品视频一区二区高潮| 亚洲三级性片| 女人和拘做爰正片视频| 成人av网址在线| 日本亚洲欧美在线| 欧美变态tickle挠乳网站| 中文国产字幕在线观看| 亚洲aⅴ男人的天堂在线观看| 青青草原综合久久大伊人精品 | 在线播放日韩精品| 日日夜夜天天综合| 日韩wuma| 日韩精品国产欧美| 老熟妇一区二区| 91福利资源站| 在线日本中文字幕| 国产噜噜噜噜久久久久久久久| 色777狠狠狠综合伊人| 在线观看国产一级片| 亚洲色图制服诱惑| 99久久国产热无码精品免费| 欧美另类69精品久久久久9999| 视频成人永久免费视频| 国产性生活免费视频| 成人听书哪个软件好| 日韩免费一级片| 亚洲美女www午夜| 四虎影视4hu4虎成人| 亚洲精品国产一区| 国产麻豆视频精品| 欧美日韩中文视频| 日韩黄在线观看| 国产精品久久久久久吹潮| 一区二区视频在线免费| 国产乱码精品一区二区三区忘忧草| 久久久久性色av无码一区二区| 欧美精品一区二| 韩国成人漫画| 亚洲成人精品电影在线观看| 国产精品99久久久久久似苏梦涵| 久久久久香蕉视频| 亚洲人成人99网站| 亚洲成a人片777777久久| 国产高清不卡无码视频| 91在线视频官网| 一区二区小视频| 欧美福利视频在线观看| 日韩在线你懂的| www.涩涩涩| 一区二区在线免费| 欧美成人免费| 91香蕉电影院| 亚洲在线网站| av成人免费网站| 日韩精品久久久久| 亚洲欧洲日韩精品在线| 日韩免费视频播放| ...xxx性欧美| 深夜福利免费在线观看| 成人h片在线播放免费网站| 亚洲国产精品一区| 国产一二三四区在线| 精品免费视频.| 午夜av成人| av在线观看地址| 国产精品区一区二区三| 欧美视频久久久| 成人黄色在线观看| 亚久久调教视频| 久久老司机精品视频| 神马久久桃色视频| 欧美影院天天5g天天爽|