精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Streaming 數據清理機制

大數據 Spark
大家剛開始用Spark Streaming時,心里肯定嘀咕,對于一個7*24小時運行的數據,cache住的RDD,broadcast 系統會幫忙自己清理掉么?還a是說必須自己做清理?如果系統幫忙清理的話,機制是啥?

前言

為啥要了解機制呢?這就好比JVM的垃圾回收,雖然JVM的垃圾回收已經巨牛了,但是依然會遇到很多和它相關的case導致系統運行不正常。

這個內容我記得自己剛接觸Spark Streaming的時候,老板也問過我,運行期間會保留多少個RDD? 當時沒回答出來。后面在群里也有人問到了,所以就整理了下。文中如有謬誤之處,還望指出。

DStream 和 RDD

我們知道Spark Streaming 計算還是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關系。然而Spark Streaming 并沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的關系,你可以理解為Java里的裝飾模式,也就是DStream 是對RDD的增強,但是行為表現和RDD是基本上差不多的。都具備幾個條件:

具有類似的tranformation動作,比如map,reduceByKey等,也有一些自己獨有的,比如Window,mapWithStated等

都具有Action動作,比如foreachRDD,count等

從編程模型上看是一致的。

所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而并不能直接復用,因為一個是DStream的變換,一個是RDD的變化。

Spark Streaming中 DStream 介紹

DStream 下面包含幾個類:

  • 數據源類,比如InputDStream,具體如DirectKafkaInputStream等
  • 轉換類,典型比如MappedDStream,ShuffledDStream
  • 輸出類,典型比如ForEachDStream

從上面來看,數據從開始(輸入)到結束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產生和操作RDD的,這也就是說,DStream有機會和義務去負責RDD的生命周期。

這就回答了前言中的問題了。Spark Streaming具備自動清理功能。

RDD 在Spark Stream中產生的流程

在Spark Streaming中RDD的生命流程大體如下:

  • 在InputDStream會將接受到的數據轉化成RDD,比如DirectKafkaInputStream 產生的就是 KafkaRDD
  • 接著通過MappedDStream等進行數據轉換,這個時候是直接調用RDD對應的map方法進行轉換的
  • 在進行輸出類操作時,才暴露出RDD,可以讓用戶執行相應的存儲,其他計算等操作。

我們這里就以下面的代碼來進行更詳細的解釋:

  1. val source  =   KafkaUtils.createDirectInputStream(....) 
  2. source.map(....).foreachRDD{rdd=> 
  3.     rdd.saveTextFile(....) 

foreachRDD 產生ForEachDStream,因為foreachRDD是個Action,所以會觸發任務的執行,會被調用generateJob方法。

  1. override def generateJob(time: Time): Option[Job] = { 
  2.    parent.getOrCompute(time) match { 
  3.      case Some(rdd) => 
  4.        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { 
  5.          foreachFunc(rdd, time) 
  6.        } 
  7.        Some(new Job(time, jobFunc)) 
  8.      case None => None 
  9.    } 
  10.  } 

對應的parent是MappedDStream,也就是說調用MappedDStream.getOrCompute.該方法在DStream中,首先會在MappedDStream對象中的generatedRDDs 變量中查找是否已經有RDD,如果沒有則觸發計算,并且將產生的RDD放到generatedRDDs

  1. @transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () 
  2.  
  3. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { 
  4.     // If RDD was already generated, then retrieve it from HashMap, 
  5.     // or else compute the RDD 
  6.     generatedRDDs.get(time).orElse { 
  7. .... 
  8. generatedRDDs.put(time, newRDD) 
  9. .... 

計算RDD是調用的compute方法,MappedDStream 的compute方法很簡單,直接調用的父類也就是DirectKafkaInputStream的getOrCompute方法:

  1. override def compute(validTime: Time): Option[RDD[U]] = { 
  2.     parent.getOrCompute(validTime).map(_.map[U](mapFunc)) 
  3.   } 

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個數據源,所以他的compute方法會直接new出一個RDD.

從上面可以得出幾個結論:

  • 數據源以及轉換類DStream都會維護一個generatedRDDs,可以按batchTime 進行獲取
  • 內部本質還是進行的RDD的轉換
  • 如果我們調用了cache會發生什么

這里又會有兩種情況,一種是調用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。

  1. DStream的cache 動作只是將DStream的變量storageLevel 設置為MEMORY_ONLY_SER,然后在產生(或者獲取)RDD的時候,調用RDD的persit方法進行設置。所以DStream.cache 產生的效果等價于RDD.cache(也就是你自己調用foreachRDD 將RDD 都設置一遍)
  2. 進入正題,我們是怎么釋放Cache住的RDD的

其實無所謂Cache不Cache住,RDD最終都是要釋放的,否則運行久了,光RDD對象也能承包了你的內存。我們知道,在Spark Streaming中,周期性產生事件驅動Spark Streaming 的類其實是:

  1. org.apache.spark.streaming.scheduler.JobGenerator 

他內部有個永動機(定時器),定時發布一個產生任務的事件:

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

然后通過processEvent進行事件處理:

  1. /** Processes all events */ 
  2.  private def processEvent(event: JobGeneratorEvent) { 
  3.    logDebug("Got event " + event) 
  4.    event match { 
  5.      case GenerateJobs(time) => generateJobs(time) 
  6.      case ClearMetadata(time) => clearMetadata(time) 
  7.      case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.        doCheckpoint(time, clearCheckpointDataLater) 
  9.      case ClearCheckpointData(time) => clearCheckpointData(time) 
  10.    } 
  11.  } 

目前我們只關注ClearMetadata 事件。對應的方法為:

  1. private def clearMetadata(time: Time) { 
  2.     ssc.graph.clearMetadata(time) 
  3.  
  4.     // If checkpointing is enabled, then checkpoint, 
  5.     // else mark batch to be fully processed 
  6.     if (shouldCheckpoint) { 
  7.       eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) 
  8.     } else { 
  9.       // If checkpointing is not enabled, then delete metadata information about 
  10.       // received blocks (block data not saved in any case). Otherwise, wait for 
  11.       // checkpointing of this batch to complete. 
  12.       val maxRememberDuration = graph.getMaxInputStreamRememberDuration() 
  13.       jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration) 
  14.       jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration) 
  15.       markBatchFullyProcessed(time) 
  16.     } 
  17.   } 

首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基于Receiver模式)的數據。

ForeachDStream 其實調用的也是DStream的方法。該方法大體如下:

  1. private[streaming] def clearMetadata(time: Time) { 
  2.     val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist"true
  3.     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) 
  4.     logDebug("Clearing references to old RDDs: [" + 
  5.       oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]"
  6.     generatedRDDs --= oldRDDs.keys 
  7.     if (unpersistData) { 
  8.       logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", ")) 
  9.       oldRDDs.values.foreach { rdd => 
  10.         rdd.unpersist(false
  11.         // Explicitly remove blocks of BlockRDD 
  12.         rdd match { 
  13.           case b: BlockRDD[_] => 
  14.             logInfo("Removing blocks of RDD " + b + " of time " + time) 
  15.             b.removeBlocks() 
  16.           case _ => 
  17.         } 
  18.       } 
  19.     } 
  20.     logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + 
  21.       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) 
  22.     dependencies.foreach(_.clearMetadata(time)) 
  23.   } 

大體執行動作如下描述:

  1. 根據記憶周期得到應該剔除的RDD
  2. 根據是否要清理cache數據,進行unpersit 操作,并且顯示的移除block
  3. 根據依賴調用其他的DStream進行動作清理

這里我們還可以看到,通過參數spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數據進行清理。

這里你會有兩個疑問:

  1. dependencies 是什么?
  2. rememberDuration 是怎么來的?

dependencies 你可以簡單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。

rememberDuration 的設置略微復雜些,大體是 slideDuration,如果設置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設置了的話,譬如通過StreamingContext.remember方法,不過通過該方法設置的值要大于計算得到的值會生效)

另外值得一提的就是后面的DStream 會調整前面的DStream的rememberDuration,譬如如果你用了window* 相關的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration。

然后根據Spark Streaming的定時性,每個周期只要完成了,都會觸發清理動作,這個就是清理動作發生的時機。代碼如下:

  1. def onBatchCompletion(time: Time) {      
  2.     eventLoop.post(ClearMetadata(time)) 

總結下

Spark Streaming 會在每個Batch任務結束時進行一次清理動作。每個DStream 都會被掃描,不同的DStream根據情況不同,保留的RDD數量也是不一致的,但都是根據rememberDuration變量決定,而該變量會被下游的DStream所影響,所以不同的DStream的rememberDuration取值是不一樣的。

 

 

責任編輯:Ophira 來源: 簡書
相關推薦

2025-07-16 09:16:36

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監控

2016-12-19 14:35:32

Spark Strea原理剖析數據

2025-09-16 08:49:13

2017-10-13 10:36:33

SparkSpark-Strea關系

2018-04-09 12:25:11

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2017-10-11 11:10:02

Spark Strea大數據流式處理

2022-05-30 08:21:17

Kafka數據傳遞

2018-10-14 15:52:46

MySQL數據清理數據庫

2019-10-17 09:25:56

Spark StreaPVUV

2023-10-24 20:32:40

大數據

2017-09-26 09:35:22

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數據

2021-07-09 10:27:12

SparkStreaming系統

2017-06-27 15:08:05

大數據Apache SparKafka Strea

2025-04-02 08:17:42

2016-03-03 15:11:42

Spark Strea工作流調度器
點贊
收藏

51CTO技術棧公眾號

久久久这里只有精品视频| 欧美日韩三级视频| 久久综合九色综合久99| 波多野结衣视频观看| 成人一区不卡| 91精品在线麻豆| 国精产品一区一区三区视频| 韩国三级av在线免费观看| 美女性感视频久久| 91国产美女在线观看| 亚洲无人区码一码二码三码的含义| 欧洲亚洲精品久久久久| 五月天一区二区| 亚洲一区三区视频在线观看| 六月丁香综合网| 美女www一区二区| 欧美激情久久久| 五月婷婷六月香| 久久影院资源站| 欧美一区午夜精品| 在线黄色免费观看| 国产高清自产拍av在线| 亚洲精品视频在线看| 欧美午夜精品久久久久免费视| 99热这里只有精品3| 久久精品女人| 欧美激情亚洲精品| 在线观看黄网址| 九九热精品视频在线观看| 欧美大片在线观看一区| 中文字幕在线综合| 在线一区av| 亚洲一二三专区| 六月婷婷激情网| 97电影在线看视频| 国产亚洲精品7777| 久久精品国产精品青草色艺 | 国产在线三区| 99精品视频在线免费观看| 亚洲自拍偷拍网址| 国产又粗又猛又爽又黄91| 老司机久久99久久精品播放免费| 欧美黑人极品猛少妇色xxxxx| 美女福利视频网| 日韩极品一区| 国产一区av在线| 免费人成又黄又爽又色| 一区二区三区日本久久久| 亚洲国产精品国自产拍av秋霞| 伊人成人免费视频| 亚洲欧洲二区| 欧美日韩国产美| 国内国产精品天干天干| 小说区图片区亚洲| 7777精品久久久大香线蕉| 亚洲男人天堂色| 99久久精品一区二区成人| 91高清视频免费看| 男女视频在线看| 国产成人精品一区二区三区视频| 午夜精品久久久久久不卡8050| 久久99久久99精品| 岛国av在线网站| 欧美视频中文在线看| 国产真实乱子伦| 精品123区| 欧美日韩视频在线第一区 | 国产精品一区二区三区四| 亚洲高清成人| 欧美在线观看网址综合| 久久久久久久久黄色| 奇米亚洲午夜久久精品| 国产精品中文字幕在线观看| 国产乱子伦精品无码码专区| 国产一区日韩二区欧美三区| 国产成人精品福利一区二区三区| 日本免费网站在线观看| 久久无码av三级| 亚洲人体一区| 亚洲综合图区| 狠狠躁夜夜躁久久躁别揉| 天天摸天天碰天天添| 成人午夜一级| 精品美女一区二区| 97伦伦午夜电影理伦片| 国产精品国产一区| 97人人模人人爽人人喊中文字| 少妇高潮av久久久久久| 极品少妇xxxx精品少妇| 国产亚洲欧美一区二区三区| 狠狠色伊人亚洲综合网站l| 亚洲欧美自拍偷拍色图| 好吊妞无缓冲视频观看| 国产成人精品一区二区三区在线| 精品卡一卡二卡三卡四在线| 37p粉嫩大胆色噜噜噜| 91视频精品| 欧美一级淫片丝袜脚交| 国产一区二区三区视频免费观看| av一区二区不卡| 亚洲日本精品国产第一区| 国产白丝在线观看| 欧美日韩高清一区二区三区| 在线精品视频播放| 98精品久久久久久久| 午夜精品www| 国产一区二区麻豆| 久久久久久99久久久精品网站| 日本道在线视频| 欧美成人精品三级网站| 欧美精品一区二区在线观看| www成人啪啪18软件| 国产欧美日韩一级| 91精品国产高清久久久久久91裸体| 水莓100在线视频| 亚洲精品亚洲人成人网在线播放| 男人操女人免费| 国产精品中文字幕制服诱惑| 日韩中文字幕在线播放| 好吊色在线视频| av在线播放成人| 久久99久久久久久| 日本一区二区乱| 精品国产依人香蕉在线精品| 中文字幕免费视频观看| 久久综合狠狠综合久久综合88 | 日本在线不卡一区| 久久精品国产精品国产精品污| 香蕉久久aⅴ一区二区三区| 欧美日本一区二区三区| 国产99在线 | 亚洲| 久久青草久久| 欧美大香线蕉线伊人久久国产精品| 性国产高清在线观看| 91精品久久久久久久久99蜜臂| 高清国产在线观看| 首页亚洲欧美制服丝腿| 麻豆传媒一区| 成人短视频app| 亚洲男人天堂视频| 日本高清不卡码| 91丨porny丨国产| av天堂永久资源网| 日韩精品丝袜美腿| 欧美在线xxx| 国产综合在线观看| 在线免费精品视频| 欧美波霸videosex极品| 日韩精品国产精品| 丝袜足脚交91精品| 亚洲成人a级片| 不卡av在线播放| 成人午夜视频一区二区播放| 亚洲网友自拍偷拍| av鲁丝一区鲁丝二区鲁丝三区| 亚洲免费成人| 欧美国产综合视频| 四虎4545www精品视频| 中文字幕精品久久| 国产精品乱码一区二区| 亚洲欧美一区二区不卡| a级大片免费看| 亚洲第一精品影视| 欧美久久综合性欧美| 日韩欧美精品电影| 久久久av网站| 免费观看黄色av| 色屁屁一区二区| 欧美老女人性生活视频| 狠狠色丁香久久婷婷综| 日韩精品在线中文字幕| 欧美一级色片| 国产女精品视频网站免费| 黄色在线免费| 亚洲国产免费av| 欧美日韩综合在线观看| 久久精品欧美日韩精品| 九九九九九九九九| 在线亚洲欧美| 亚洲无玛一区| 日韩高清一区| 欧美一级淫片播放口| 麻豆网站在线观看| 欧美精品一区二区不卡 | 欧美另类高清zo欧美| 久久久久久蜜桃| 久久精品视频免费观看| 91丝袜超薄交口足| 一本一本久久| 欧美爱爱视频网站| 欧美三级电影在线| 国产在线久久久| 九色porny丨首页入口在线| 中文字幕欧美精品日韩中文字幕| 亚洲精品一区二区口爆| 在线观看国产日韩| 免费日韩在线视频| 欧美韩日一区二区三区四区| 成年人性生活视频| 老妇喷水一区二区三区| 国产91在线亚洲| 欧美日韩水蜜桃| 久久青青草原| 日韩中文字幕无砖| 国产日本欧美视频| 中文在线а√在线8| 久久6精品影院| h视频在线播放| 日韩电视剧免费观看网站| 91中文字幕在线视频| 欧美性猛交xxxx乱大交蜜桃| 永久免费看黄网站| 中文字幕精品—区二区四季| 毛茸茸free性熟hd| 国产精品12区| 青青草原国产在线视频| 免播放器亚洲| www.日本在线播放| 综合精品一区| 亚洲免费av网| 成人激情视频| 日本电影一区二区三区| 日韩aaa久久蜜桃av| 成人av片网址| 日本超碰一区二区| 成人h视频在线观看播放| 99精品国自产在线| 国产成人精品免高潮在线观看 | 国产精品男人爽免费视频1| av日韩中文| 欧美激情免费观看| 在线观看h网| 久久国产天堂福利天堂| 免费人成在线观看播放视频| 中文字幕精品视频| 国产高清免费av在线| 亚洲毛片在线观看| 视频一区二区免费| 亚洲а∨天堂久久精品喷水| 亚洲精品18在线观看| 日韩一区二区三免费高清| 国产精品亚洲欧美在线播放| 欧美日韩性生活| 一区二区三区亚洲视频| 欧美精品粉嫩高潮一区二区| 亚洲一区中文字幕在线| 欧美视频一区二区三区四区| 中文字幕福利视频| 欧美日韩高清一区二区不卡| 91一区二区视频| 日韩欧美www| 国产成人三级在线观看视频| 欧美精品一区二区高清在线观看| 四季av日韩精品一区| 精品亚洲国产视频| 麻豆av电影在线观看| 亚洲性猛交xxxxwww| 在线观看完整版免费| 久久久av一区| 欧美黑人猛交| 欧美一区二区三区精品电影| 一区二区电影免费观看| 国产精品入口免费视频一| 亚洲综合资源| 成人黄色片视频网站| 精品欠久久久中文字幕加勒比 | 水蜜桃久久夜色精品一区| 婷婷视频在线播放| 国内综合精品午夜久久资源| 国产深夜男女无套内射| 免费成人在线观看视频| 色黄视频免费看| 波多野结衣中文字幕一区| 性欧美13一14内谢| 国产精品国产三级国产aⅴ中文| 极品久久久久久| 欧美性高潮床叫视频| 中文字幕一二区| 日韩精品一区二区三区三区免费| 亚洲欧美日韩免费| 中文字幕日韩高清| 久久一卡二卡| 国产成人在线一区| 日韩成人久久| 欧美一级日本a级v片| 一区二区三区在线| 欧美色图另类小说| 国产在线精品一区二区| 野花社区视频在线观看| ●精品国产综合乱码久久久久| 精品肉丝脚一区二区三区| 91久久精品一区二区二区| 国产黄色高清视频| 一个人看的www久久| 秋霞在线午夜| 国产精品丝袜高跟| 韩国女主播一区二区三区 | 亚洲一级影院| 中文字幕视频在线免费观看| 成人在线综合网| 影音先锋男人在线| 欧美日韩国内自拍| 国产黄色av网站| 中文字幕日韩高清| 一区二区三区短视频| 成人av影视在线| 99热精品久久| 日日碰狠狠躁久久躁婷婷| 国产精品1区2区| 成人免费视频入口| 色综合色狠狠天天综合色| 亚洲精品久久久狠狠狠爱| 最近2019中文字幕大全第二页| 精品人人视频| 99re在线视频观看| 亚洲精品成人影院| 亚洲精品一二三四五区| 99re视频这里只有精品| 久久久久免费看| 欧美一区二区免费观在线| av福利在线播放| 清纯唯美亚洲综合| 欧美成人基地| 日韩人妻无码精品久久久不卡| 久久成人18免费观看| 手机看片福利视频| 欧美性极品xxxx做受| 天堂av资源在线| 色综合视频网站| 深夜福利一区| 欧洲精品视频在线| 国产资源精品在线观看| 成年人视频软件| 欧美日韩夫妻久久| 日本在线免费| 国产日韩av在线| 色综合色综合| 国产福利在线免费| 国产精品久久三| 国产有码在线观看| 日韩中文字幕国产精品| 欧美91在线|欧美| 中文字幕日韩精品久久| 激情综合网最新| 色婷婷在线视频观看| 日韩美女视频一区二区在线观看| 最新av在线播放| 北条麻妃高清一区| 亚洲日本免费| 在线免费观看黄色小视频| 色老汉一区二区三区| 国产视频网站在线| 国产精品在线看| 综合久久精品| www.17c.com喷水少妇| 精品免费在线观看| 邻家有女韩剧在线观看国语| 国产精品成人一区二区三区吃奶| 精品国产一区二区三区四区 | 成人国产激情| 超碰在线免费观看97| 国产iv一区二区三区| 国产一级精品视频| 一区二区三区回区在观看免费视频| 成人做爰视频www| 看全色黄大色大片| 成人99免费视频| 91丨九色丨海角社区| xxav国产精品美女主播| 伊人久久影院| 欧美三级午夜理伦三级| 中文字幕第一区二区| 国产白浆在线观看| 亚州国产精品久久久| 欧美日韩黑人| 国产黄色一区二区三区| 精品久久久在线观看| aaa在线免费观看| 99在线观看| 日韩有码一区二区三区| 欧美肥妇bbwbbw| 日韩不卡在线观看| 精品久久99| 欧日韩免费视频| 国产精品美日韩| 国产刺激高潮av| 国产精品一区二区女厕厕| 午夜性色一区二区三区免费视频| 538国产视频| 91精品免费在线| gogo亚洲高清大胆美女人体| 91看片淫黄大片91| 久久精品亚洲国产奇米99| 国产高清不卡视频| 国产精品99导航| 精品二区久久| 波多野结衣久久久久| 精品丝袜一区二区三区| 日韩一区二区三区色 | av亚洲一区|