精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Shuffle過程分析:Map階段處理流程

大數(shù)據(jù) Spark
默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數(shù)據(jù)的計算。我們可以在Driver和Executor對應的SparkEnv對象創(chuàng)建過程中看到對應的配置。

默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數(shù)據(jù)的計算。我們可以在Driver和Executor對應的SparkEnv對象創(chuàng)建過程中看到對應的配置,如下代碼所示:

 

  1. // Let the user specify short names for shuffle managers 
  2.     val shortShuffleMgrNames = Map( 
  3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
  4.       "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 
  5.     val shuffleMgrName = conf.get("spark.shuffle.manager""sort"
  6.     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
  7.     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 

如果需要修改ShuffleManager實現(xiàn),則只需要修改配置項spark.shuffle.manager即可,默認支持sort和 tungsten-sort,可以指定自己實現(xiàn)的ShuffleManager類。

因為Shuffle過程中需要將Map結(jié)果數(shù)據(jù)輸出到文件,所以需要通過注冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,通過它來控制Map階段記錄數(shù)據(jù)輸出的行為。其中,ShuffleHandle包含了如下基本信息:

  • shuffleId:標識Shuffle過程的唯一ID
  • numMaps:RDD對應的Partitioner指定的Partition的個數(shù),也就是ShuffleMapTask輸出的Partition個數(shù)
  • dependency:RDD對應的依賴ShuffleDependency

下面我們看下,在SortShuffleManager中是如何注冊Shuffle的,代碼如下所示:

 

  1. override def registerShuffle[K, V, C]( 
  2.       shuffleId: Int
  3.       numMaps: Int
  4.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 
  5.     if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 
  6.       new BypassMergeSortShuffleHandle[K, V]( 
  7.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  8.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 
  9.       new SerializedShuffleHandle[K, V]( 
  10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  11.     } else { 
  12.       new BaseShuffleHandle(shuffleId, numMaps, dependency) 
  13.     } 
  14.   } 

上面代碼中,對應如下3種ShuffleHandle可以選擇,說明如下:

  • BypassMergeSortShuffleHandle

如果dependency不需要進行Map Side Combine,并且RDD對應的ShuffleDependency中的Partitioner設置的Partition的數(shù)量(這個不要和parent RDD的Partition個數(shù)混淆,Partitioner指定了map處理結(jié)果的Partition個數(shù),每個Partition數(shù)據(jù)會在Shuffle過程中全部被拉取而拷貝到下游的某個Executor端)小于等于配置參數(shù)spark.shuffle.sort.bypassMergeThreshold的值,則會注冊BypassMergeSortShuffleHandle。默認情況下,spark.shuffle.sort.bypassMergeThreshold的取值是200,這種情況下會直接將對RDD的 map處理結(jié)果的各個Partition數(shù)據(jù)寫入文件,并***做一個合并處理。

  • SerializedShuffleHandle

如果ShuffleDependency中的Serializer,允許對將要輸出數(shù)據(jù)對象進行排序后,再執(zhí)行序列化寫入到文件,則會選擇創(chuàng)建一個SerializedShuffleHandle。

  • BaseShuffleHandle

除了上面兩種ShuffleHandle以后,其他情況都會創(chuàng)建一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數(shù)據(jù)。

Map階段處理流程分析

Map階段RDD的計算,對應ShuffleMapTask這個實現(xiàn)類,它最終會在每個Executor上啟動運行,每個ShuffleMapTask處理RDD的一個Partition的數(shù)據(jù)。這個過程的核心處理邏輯,代碼如下所示:

 

  1. val manager = SparkEnv.get.shuffleManager 
  2.       writer = manager.getWriter[AnyAny](dep.shuffleHandle, partitionId, context) 
  3.       writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[AnyAny]]]) 

上面代碼中,在調(diào)用rdd的iterator()方法時,會根據(jù)RDD實現(xiàn)類的compute方法指定的處理邏輯對數(shù)據(jù)進行處理,當然,如果該Partition對應的數(shù)據(jù)已經(jīng)處理過并存儲在MemoryStore或DiskStore,直接通過BlockManager獲取到對應的Block數(shù)據(jù),而無需每次需要時重新計算。然后,write()方法會將已經(jīng)處理過的Partition數(shù)據(jù)輸出到磁盤文件。

在Spark Shuffle過程中,每個ShuffleMapTask會通過配置的ShuffleManager實現(xiàn)類對應的ShuffleManager對象(實際上是在SparkEnv中創(chuàng)建),根據(jù)已經(jīng)注冊的ShuffleHandle,獲取到對應的ShuffleWriter對象,然后通過ShuffleWriter對象將Partition數(shù)據(jù)寫入內(nèi)存或文件。所以,接下來我們可能關心每一種ShuffleHandle對應的ShuffleWriter的行為,可以看到SortShuffleManager中獲取到ShuffleWriter的實現(xiàn)代碼,如下所示:

 

  1. /** Get a writer for a given partition. Called on executors by map tasks. */ 
  2.   override def getWriter[K, V]( 
  3.       handle: ShuffleHandle, 
  4.       mapId: Int
  5.       context: TaskContext): ShuffleWriter[K, V] = { 
  6.     numMapsForShuffle.putIfAbsent( 
  7.       handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 
  8.     val env = SparkEnv.get 
  9.     handle match { 
  10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 
  11.         new UnsafeShuffleWriter( 
  12.           env.blockManager, 
  13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  14.           context.taskMemoryManager(), 
  15.           unsafeShuffleHandle, 
  16.           mapId, 
  17.           context, 
  18.           env.conf) 
  19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 
  20.         new BypassMergeSortShuffleWriter( 
  21.           env.blockManager, 
  22.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  23.           bypassMergeSortHandle, 
  24.           mapId, 
  25.           context, 
  26.           env.conf) 
  27.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 
  28.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 
  29.     } 
  30.   } 

我們以最簡單的SortShuffleWriter為例進行分析,在SortShuffleManager可以通過getWriter()方法創(chuàng)建一個SortShuffleWriter對象,然后在ShuffleMapTask中調(diào)用SortShuffleWriter對象的write()方法處理Map輸出的記錄數(shù)據(jù),write()方法的處理代碼,如下所示:

 

  1. /** Write a bunch of records to this task's output */ 
  2.   override def write(records: Iterator[Product2[K, V]]): Unit = { 
  3.     sorter = if (dep.mapSideCombine) { 
  4.       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!"
  5.       new ExternalSorter[K, V, C]( 
  6.         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 
  7.     } else { 
  8.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  9.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  10.       // if the operation being run is sortByKey. 
  11.       new ExternalSorter[K, V, V]( 
  12.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 
  13.     } 
  14.     sorter.insertAll(records) 
  15.  
  16.     // Don't bother including the time to open the merged output file in the shuffle write time
  17.     // because it just opens a single file, so is typically too fast to measure accurately 
  18.     // (see SPARK-3570). 
  19.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 
  20.     val tmp = Utils.tempFileWith(output
  21.     val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 
  22.     val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 
  23.     shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 
  24.     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 
  25.   } 

從SortShuffleWriter類中的write()方法可以看到,最終調(diào)用了ExeternalSorter的insertAll()方法,實現(xiàn)了Map端RDD某個Partition數(shù)據(jù)處理并輸出到內(nèi)存或磁盤文件,這也是處理Map階段輸出記錄數(shù)據(jù)最核心、最復雜的過程。我們將其分為兩個階段進行分析:***階段是,ExeternalSorter的insertAll()方法處理過程,將記錄數(shù)據(jù)Spill到磁盤文件;第二階段是,執(zhí)行完insertAll()方法之后的處理邏輯,創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件。

內(nèi)存緩沖寫記錄數(shù)據(jù)并Spill到磁盤文件

查看SortShuffleWriter類的write()方法可以看到,在內(nèi)存中緩存記錄數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)有兩種:一種是Buffer,對應的實現(xiàn)類PartitionedPairBuffer,設置mapSideCombine=false時會使用該結(jié)構(gòu);另一種是Map,對應的實現(xiàn)類是PartitionedAppendOnlyMap,設置mapSideCombine=false時會使用該結(jié)構(gòu)。根據(jù)是否指定mapSideCombine選項,分別對應不同的處理流程,我們分別說明如下:

設置mapSideCombine=false時

這種情況在Map階段不進行Combine操作,在內(nèi)存中緩存記錄數(shù)據(jù)會使用PartitionedPairBuffer這種數(shù)據(jù)結(jié)構(gòu)來緩存、排序記錄數(shù)據(jù),它是一個Append-only Buffer,僅支持向Buffer中追加數(shù)據(jù)鍵值對記錄,PartitionedPairBuffer的結(jié)構(gòu)如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

默認情況下,PartitionedPairBuffer初始分配的存儲容量為capacity = initialCapacity = 64,實際上這個容量是針對key的容量,因為要存儲的是鍵值對記錄數(shù)據(jù),所以實際存儲鍵值對的容量為2*initialCapacity = 128。PartitionedPairBuffer是一個能夠動態(tài)擴充容量的Buffer,內(nèi)部使用一個一維數(shù)組來存儲鍵值對,每次擴容結(jié)果為當前Buffer容量的2倍,即2*capacity,***支持存儲2^31-1個鍵值對記錄(1073741823個)。

通過上圖可以看到,PartitionedPairBuffer存儲的鍵值對記錄數(shù)據(jù),鍵是(partition, key)這樣一個Tuple,值是對應的數(shù)據(jù)value,而且curSize是用來跟蹤寫入Buffer中的記錄的,key在Buffer中的索引位置為2*curSize,value的索引位置為2*curSize+1,可見一個鍵值對的key和value的存儲在PartitionedPairBuffer內(nèi)部的數(shù)組中是相鄰的。

使用PartitionedPairBuffer緩存鍵值對記錄數(shù)據(jù),通過跟蹤實際寫入到Buffer內(nèi)的記錄數(shù)據(jù)的字節(jié)數(shù)來判斷,是否需要將Buffer中的數(shù)據(jù)Spill到磁盤文件,如下代碼所示:

 

  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 
  2.     var shouldSpill = false 
  3.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 
  4.       // Claim up to double our current memory from the shuffle memory pool 
  5.       val amountToRequest = 2 * currentMemory - myMemoryThreshold 
  6.       val granted = acquireMemory(amountToRequest) 
  7.       myMemoryThreshold += granted 
  8.       // If we were granted too little memory to grow further (either tryToAcquire returned 0, 
  9.       // or we already had more memory than myMemoryThreshold), spill the current collection 
  10.       shouldSpill = currentMemory >= myMemoryThreshold 
  11.     } 
  12.     shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 
  13.     // Actually spill 
  14.     if (shouldSpill) { 
  15.       _spillCount += 1 
  16.       logSpillage(currentMemory) 
  17.       spill(collection) 
  18.       _elementsRead = 0 
  19.       _memoryBytesSpilled += currentMemory 
  20.       releaseMemory() 
  21.     } 
  22.     shouldSpill 
  23.   } 

上面elementsRead表示存儲到PartitionedPairBuffer中的記錄數(shù),currentMemory是對Buffer中的總記錄數(shù)據(jù)大小(字節(jié)數(shù))的估算,myMemoryThreshold通過配置項spark.shuffle.spill.initialMemoryThreshold來進行設置的,默認值為5 * 1024 * 1024 = 5M。當滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold時,會先嘗試向MemoryManager申請2 * currentMemory – myMemoryThreshold大小的內(nèi)存,如果能夠申請到,則不進行Spill操作,而是繼續(xù)向Buffer中存儲數(shù)據(jù),否則就會調(diào)用spill()方法將Buffer中數(shù)據(jù)輸出到磁盤文件。

向PartitionedPairBuffer中寫入記錄數(shù)據(jù),以及滿足條件Spill記錄數(shù)據(jù)到磁盤文件,具體處理流程,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

為了查看按照怎樣的規(guī)則進行排序,我們看一下,當不進行Map Side Combine時,創(chuàng)建ExternalSorter對象的代碼如下所示:

 

  1. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  2.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  3.       // if the operation being run is sortByKey. 
  4.       new ExternalSorter[K, V, V]( 
  5.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 

上面aggregator = None,ordering = None,在對PartitionedPairBuffer中的記錄數(shù)據(jù)Spill到磁盤之前,要使用默認的排序規(guī)則進行排序,排序的規(guī)則是只對PartitionedPairBuffer中的記錄按Partition ID進行升序排序,可以查看WritablePartitionedPairCollection伴生對象類的代碼(其中PartitionedPairBuffer類實現(xiàn)了特質(zhì)WritablePartitionedPairCollection),如下所示:

 

  1. /** 
  2.    * A comparator for (Int, K) pairs that orders them by only their partition ID. 
  3.    */ 
  4.   def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 
  5.     override def compare(a: (Int, K), b: (Int, K)): Int = { 
  6.       a._1 - b._1 
  7.     } 
  8.   } 

上面圖中,引用了SortShuffleWriter.writeBlockFiles這個子序列圖,用來生成Block數(shù)據(jù)文件和索引文件,后面我們會單獨說明。通過對RDD進行計算生成一個記錄迭代器對象,通過該迭代器迭代出的記錄會存儲到PartitionedPairBuffer中,當滿足Spill條件時,先對PartitionedPairBuffer中記錄進行排序,***Spill到磁盤文件,這個過程中PartitionedPairBuffer中的記錄數(shù)據(jù)的變化情況,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

上圖中,對內(nèi)存中PartitionedPairBuffer中的記錄按照Partition ID進行排序,并且屬于同一個Partition的數(shù)據(jù)記錄在PartitionedPairBuffer內(nèi)部的data數(shù)組中是連續(xù)的。排序結(jié)束后,在Spill到磁盤文件時,將對應的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中連續(xù)存儲鍵值對數(shù)據(jù),但同時在另一個內(nèi)存數(shù)組結(jié)構(gòu)中會保存文件中每個Partition擁有的記錄數(shù),這樣就能根據(jù)Partition的記錄數(shù)來順序讀取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中屬于同一個Partition的全部記錄數(shù)據(jù)。

ExternalSorter類內(nèi)部維護了一個SpillFile的ArrayBuffer數(shù)組,最終可能會生成多個SpillFile,SpillFile的定義如下所示:

 

  1. private[this] case class SpilledFile( 
  2.     file: File, 
  3.     blockId: BlockId, 
  4.     serializerBatchSizes: Array[Long], 
  5.     elementsPerPartition: Array[Long]) 

每個SpillFile包含一個blockId,標識Map輸出的該臨時文件;serializerBatchSizes表示每次批量寫入到文件的Object的數(shù)量,默認為10000,由配置項spark.shuffle.spill.batchSize來控制;elementsPerPartition表示每個Partition中的Object的數(shù)量。調(diào)用ExternalSorter的insertAll()方法,最終可能有如下3種情況:

  • Map階段輸出記錄數(shù)較少,沒有生成SpillFile,那么所有數(shù)據(jù)都在Buffer中,直接對Buffer中記錄排序并輸出到文件
  • Map階段輸出記錄數(shù)較多,生成多個SpillFile,同時Buffer中也有部分記錄數(shù)據(jù)
  • Map階段輸出記錄數(shù)較多,只生成多個SpillFile
  • 有關后續(xù)如何對上面3種情況進行處理,可以想見后面對子序列圖SortShuffleWriter.writeBlockFiles的說明。
  • 設置mapSideCombine=true時

這種情況在Map階段會執(zhí)行Combine操作,在Map階段進行Combine操作能夠降低Map階段數(shù)據(jù)記錄的總數(shù),從而降低Shuffle過程中數(shù)據(jù)的跨網(wǎng)絡拷貝傳輸。這時,RDD對應的ShuffleDependency需要設置一個Aggregator用來執(zhí)行Combine操作,可以看下Aggregator類聲明,代碼如下所示:

 

  1. /** 
  2.  * :: DeveloperApi :: 
  3.  * A set of functions used to aggregate data. 
  4.  * 
  5.  * @param createCombiner function to create the initial value of the aggregation. 
  6.  * @param mergeValue function to merge a new value into the aggregation result. 
  7.  * @param mergeCombiners function to merge outputs from multiple mergeValue function
  8.  */ 
  9. @DeveloperApi 
  10. case class Aggregator[K, V, C] ( 
  11.     createCombiner: V => C, 
  12.     mergeValue: (C, V) => C, 
  13.     mergeCombiners: (C, C) => C) { 
  14.   ... ... 

由于在Map階段只用到了構(gòu)造Aggregator的幾個函數(shù)參數(shù)createCombiner、mergeValue、mergeCombiners,我們對這幾個函數(shù)詳細說明如下:

  • createCombiner:進行Aggregation開始時,需要設置初始值。因為在Aggregation過程中使用了類似Map的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來管理鍵值對,每次加入前會先查看Map內(nèi)存結(jié)構(gòu)中是否存在Key對應的Value,***次肯定不存在,所以***將某個Key的Value加入到Map內(nèi)存結(jié)構(gòu)中時,Key在Map內(nèi)存結(jié)構(gòu)中***次有了Value。
  • mergeValue:某個Key已經(jīng)在Map結(jié)構(gòu)中存在Value,后續(xù)某次又遇到相同的Key和一個新的Value,這時需要通過該函數(shù),將舊Value和新Value進行合并,根據(jù)Key檢索能夠得到合并后的新Value。
  • mergeCombiners:一個Map內(nèi)存結(jié)構(gòu)中Key和Value是由mergeValue生成的,那么在向Map中插入數(shù)據(jù),肯定會遇到Map使用容量達到上限,這時需要將記錄數(shù)據(jù)Spill到磁盤文件,那么多個Spill輸出的磁盤文件中可能存在同一個Key,這時需要對多個Spill輸出的磁盤文件中的Key的多個Value進行合并,這時需要使用mergeCombiners函數(shù)進行處理。

該類中定義了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于這些函數(shù)是在Reduce階段使用的,所以在這里先不說明,后續(xù)文章我們會單獨詳細來分析。

我們通過下面的序列圖來描述,需要進行Map Side Combine時的處理流程,如下所示:

Spark Shuffle過程分析:Map階段處理流程

對照上圖,我們看一下,當需要進行Map Side Combine時,對應的ExternalSorter類insertAll()方法中的處理邏輯,代碼如下所示:

 

  1. val shouldCombine = aggregator.isDefined 
  2.  
  3.     if (shouldCombine) { 
  4.       // Combine values in-memory first using our AppendOnlyMap 
  5.       val mergeValue = aggregator.get.mergeValue 
  6.       val createCombiner = aggregator.get.createCombiner 
  7.       var kv: Product2[K, V] = null 
  8.       val update = (hadValue: Boolean, oldValue: C) => { 
  9.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 
  10.       } 
  11.       while (records.hasNext) { 
  12.         addElementsRead() 
  13.         kv = records.next() 
  14.         map.changeValue((getPartition(kv._1), kv._1), update
  15.         maybeSpillCollection(usingMap = true
  16.       } 
  17.     } 

上面代碼中,map是內(nèi)存數(shù)據(jù)結(jié)構(gòu),最重要的是update函數(shù)和map的changeValue方法(這里的map對應的實現(xiàn)類是PartitionedAppendOnlyMap)。update函數(shù)所做的工作,其實就是對createCombiner和mergeValue這兩個函數(shù)的使用,***次遇到一個Key調(diào)用createCombiner函數(shù)處理,非***遇到同一個Key對應新的Value調(diào)用mergeValue函數(shù)進行合并處理。map的changeValue方法主要是將Key和Value在map中存儲或者進行修改(對出現(xiàn)的同一個Key的多個Value進行合并,并將合并后的新Value替換舊Value)。

PartitionedAppendOnlyMap是一個經(jīng)過優(yōu)化的哈希表,它支持向map中追加數(shù)據(jù),以及修改Key對應的Value,但是不支持刪除某個Key及其對應的Value。它能夠支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當達到指定存儲容量或者指定限制,就會將map中記錄數(shù)據(jù)Spill到磁盤文件,這個過程和前面的類似,不再累述。

創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件

無論是使用PartitionedPairBuffer,還是使用PartitionedAppendOnlyMap,當需要容量滿足Spill條件時,都會將該內(nèi)存結(jié)構(gòu)(buffer/map)中記錄數(shù)據(jù)Spill到磁盤文件,所以Spill到磁盤文件的格式是相同的。對于后續(xù)Block數(shù)據(jù)文件和索引文件的生成邏輯也是相同,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

假設,我們生成的Shuffle Block文件對應各個參數(shù)為:shuffleId=2901,mapId=11825,reduceId=0,這里reduceId是一個NOOP_REDUCE_ID,表示與DiskStore進行磁盤I/O交互操作,而DiskStore期望對應一個(map, reduce)對,但是對于排序的Shuffle輸出,通常Reducer拉取數(shù)據(jù)后只生成一個文件(Reduce文件),所以這里默認reduceId為0。經(jīng)過上圖的處理流程,可以生成一個.data文件,也就是Block數(shù)據(jù)文件;一個.index文件,也就是包含了各個Partition在數(shù)據(jù)文件中的偏移位置的索引文件。這個過程生成的文件,示例如下所示:

 

  1. shuffle_2901_11825_0.data  
  2. shuffle_2901_11825_0.index 

這樣,對于每個RDD的多個Partition進行處理后,都會生成對應的數(shù)據(jù)文件和索引文件,后續(xù)在Reduce端就可以讀取這些Block文件,這些記錄數(shù)據(jù)在文件中都是經(jīng)過分區(qū)(Partitioned)的。

責任編輯:未麗燕 來源: 36大數(shù)據(jù)
相關推薦

2017-03-27 10:48:03

Hive map優(yōu)化分析

2021-08-11 06:57:16

ShuffleSpark核心

2019-04-22 15:24:24

HadoopSuffleMap端

2023-02-08 13:08:31

2021-10-20 10:04:47

鴻蒙HarmonyOS應用

2025-09-15 06:25:00

2025-06-13 08:40:00

ShuffleSpark大數(shù)據(jù)

2022-03-15 08:25:32

SparkShuffle框架

2019-07-26 15:01:42

SparkShuffle內(nèi)存

2019-06-06 15:22:07

SparkShuffle內(nèi)存

2012-08-30 09:48:02

Struts2Java

2023-11-20 07:27:00

云原生Spark

2009-07-03 13:41:44

WinCE編譯過程

2024-07-15 09:58:03

OpenRestyNginx日志

2016-12-14 19:20:07

Spark SQL架構(gòu)分布式

2009-07-28 11:32:41

光纖鏈路故障

2011-04-13 14:57:11

ASP.NET請求處理

2017-04-24 09:20:05

Spark分析分區(qū)器

2010-06-13 14:36:20

RARP協(xié)議

2022-08-25 18:48:29

字節(jié)跳動CSS開源
點贊
收藏

51CTO技術棧公眾號

精品自拍偷拍视频| 在线视频91p| 色婷婷av一区二区三区之e本道| 一级毛片视频在线| 亚洲综合好骚| 国产精品夫妻自拍| 午夜剧场成人观在线视频免费观看| 丝袜足脚交91精品| 午夜影院在线看| 日韩精品视频一区二区三区| 欧美国产综合一区二区| 69**夜色精品国产69乱| 国产精品91av| а√天堂在线官网| 黄一区二区三区| 在线观看欧美成人| 国产在线青青草| 99热在线只有精品| 在线中文字幕亚洲| 7777精品伊人久久久大香线蕉的| 天堂va久久久噜噜噜久久va| www.久久久久久| 91精品国产乱码久久久张津瑜| 嫩草伊人久久精品少妇av杨幂| av电影在线观看一区| 久久91亚洲精品中文字幕| 亚洲人视频在线| h视频在线播放| 日本va欧美va精品| 最近2019中文免费高清视频观看www99 | 日韩精品无码一区二区三区| 中文字幕日韩一级| 一区二区电影| 少妇高潮久久久久久潘金莲| a级在线观看视频| 欧美aaaaa性bbbbb小妇| 99re亚洲国产精品| 久久久免费精品| 久久人妻一区二区| 香蕉久久免费电影| 国产精品每日更新在线播放网址| 国产精品美女呻吟| 一级黄色片日本| 日韩精品久久久久久久软件91| 欧美色精品在线视频| 一区二区三区四区视频在线观看 | 99超碰麻豆| 久久久美女视频| 欧美自拍视频| 色天使色偷偷av一区二区| 神马影院一区二区三区| 青青草手机在线| 日本亚洲欧美天堂免费| 欧美与欧洲交xxxx免费观看| 美国美女黄色片| yy6080久久伦理一区二区| 国产精品二三区| 亚洲人成影视在线观看| 3p在线观看| 国产精品色在线| 亚洲精美视频| 亚洲免费黄色片| 日韩av一区二区三区四区| 国产成人精品优优av| 欧美日韩色视频| 女人抽搐喷水高潮国产精品| 日韩成人高清在线| 手机在线国产视频| 波多野结衣亚洲| 亚洲色图19p| 久久综合一区二区三区| 国产精品嫩草影院桃色| 一本久道久久综合狠狠爱| 色偷偷9999www| 女人18毛片毛片毛片毛片区二| 97精品国产一区二区三区 | 日韩av电影在线网| 中日韩一级黄色片| 欧美婷婷在线| 中文字幕精品在线| 欧美成人三级伦在线观看| 欧美日韩另类图片| 亚洲一级黄色片| 成人一区二区三区仙踪林| 一区二区三区视频播放| 欧美日韩美少妇| 日韩在线综合网| 最新超碰在线| 中文字幕在线免费不卡| 一本二本三本亚洲码| 黄色软件在线观看| 成人午夜av在线| 91最新国产视频| 一卡二卡三卡在线| 免费观看在线色综合| 2021国产精品视频| 伊人精品一区二区三区| 久久伊人亚洲| 欧洲亚洲在线视频| 日韩精品一区二区在线播放| 欧美日本中文| 日韩美女毛茸茸| 国产乱淫片视频| 99re这里只有精品视频首页| 致1999电视剧免费观看策驰影院| 懂色一区二区三区| 91女神在线视频| 韩国成人一区| 人妻无码中文字幕| 成人性生交大合| 奇米影视首页 狠狠色丁香婷婷久久综合 | 精品蜜桃一区二区三区| 久久久久久国产精品免费无遮挡| 国产亚洲精品7777| 欧美日韩亚洲免费| 国产香蕉视频在线看| 亚洲精品免费看| 五月天综合婷婷| 97成人资源| 欧美tickling网站挠脚心| gogo亚洲国模私拍人体| 香蕉成人app| 正在播放欧美一区| 日韩不卡视频在线| 久久精品一本| 国产精品三级在线| 深夜福利在线观看直播| 91丨九色丨国产丨porny| 99精品一级欧美片免费播放| 在线观看中文| 欧美色手机在线观看| 国产精品揄拍100视频| 久久av影视| 综合久久五月天| 天天操天天操天天操天天| 久久久夜夜夜| 国产伦精品一区二区三区照片| 天天操天天射天天| 一区二区三区中文免费| 在线观看免费av网址| 欧美日韩高清| 久久的精品视频| 青青草自拍偷拍| 外国成人免费视频| 欧美人与性动交a欧美精品| 中文有码在线播放| 国产视频一区二区在线观看| 日韩精品视频一区二区在线观看| 精品少妇3p| 中文字幕不卡在线视频极品| 天堂网中文字幕| 久久激情综合网| 国产日韩欧美一区二区三区四区 | 伊人久久久久久久久久久| 国产精品suv一区| 久久久www成人免费无遮挡大片| 亚洲精品视频一二三| 99九九久久| 色视频www在线播放国产成人| 精品国产www| 国产v日产∨综合v精品视频| 欧美另类一区| 性感美女一区二区在线观看| 欧美一区二区三区视频在线观看| 在线天堂www在线国语对白| 成人综合久久| 国模视频一区二区| 色婷婷av一区二区三| 富二代精品短视频| 亚洲第一天堂久久| 一二三区不卡| 好吊妞www.84com只有这里才有精品 | 久久777国产线看观看精品| av免费在线不卡| 亚洲二区在线视频| 成人综合久久网| 国产一区观看| 91精品在线观看视频| 国模吧精品人体gogo| 欧美视频在线一区二区三区| 国产精品无码网站| 日韩av一级片| 成人短视频在线观看免费| 国产精品一区二区免费福利视频 | 精品av中文字幕在线毛片| 欧美在线综合视频| 欧美bbbbb性bbbbb视频| 日本91福利区| 日韩美女爱爱视频| 国产日韩一区二区三免费高清| 亚洲男女性事视频| 日本熟妇一区二区| 国产午夜精品美女毛片视频| 激情黄色小视频| 亚洲精品偷拍| 成人欧美一区二区三区黑人免费| 免费看a在线观看| 欧美中文一区二区三区| 欧美日韩在线视频免费播放| 韩国成人在线视频| 无码中文字幕色专区| 亚洲国产aⅴ精品一区二区| 69**夜色精品国产69乱| 免费黄色网址在线观看| 亚洲精品99久久久久| 亚洲激情视频一区| 国产精品日韩成人| 捆绑裸体绳奴bdsm亚洲| 韩国v欧美v亚洲v日本v| 黄色av免费在线播放| 欧洲专线二区三区| 91在线在线观看| 丰满的护士2在线观看高清| 欧美va亚洲va在线观看蝴蝶网| 免费av中文字幕| 国产欧美综合在线| 美女露出粉嫩尿囗让男人桶| 今天的高清视频免费播放成人| 91久久国产综合久久蜜月精品| 三上悠亚国产精品一区二区三区| 精品中文字幕在线观看| av资源网站在线观看| 亚洲国产91精品在线观看| 欧美特黄aaaaaa| 亚洲午夜三级在线| 添女人荫蒂视频| 国产精品一区二区三区四区| 久久久99精品视频| 日韩免费特黄一二三区| 91精品一区二区| 成人一区视频| 国产成人激情小视频| av最新在线| 亚洲欧洲在线视频| 91丨九色丨蝌蚪丨对白| 91国偷自产一区二区三区成为亚洲经典 | 欧美性色xo影院| 五月激情四射婷婷| 久久久亚洲高清| 人妻无码一区二区三区| 91丨porny丨在线| 李丽珍裸体午夜理伦片| 成人精品在线视频观看| 韩国三级视频在线观看| 国产成人av电影免费在线观看| 在线观看免费不卡av| 久久精品99国产精品日本| 91色国产在线| 欧美天天视频| 日韩国产小视频| 国产精品探花在线观看| 91天堂在线视频| 91麻豆精品国产综合久久久 | 先锋欧美三级| 国产精品久久久久福利| www国产在线观看| 久久久成人精品视频| 香蕉视频免费看| 欧美日产国产精品| 亚洲一区二区三区网站| 午夜欧美大尺度福利影院在线看| 舐め犯し波多野结衣在线观看| 狠狠色丁香久久婷婷综| 天天久久综合网| 久久中文精品| 人人干人人视频| 裸体在线国模精品偷拍| 色噜噜狠狠一区二区三区狼国成人| 精东粉嫩av免费一区二区三区| 亚洲精品中文字幕乱码无线| 国产裸体歌舞团一区二区| 无码国产精品一区二区免费式直播| 久久久久久穴| 三年中国国语在线播放免费| 精品在线播放免费| 成人黄色片视频| 激情91久久| 无码播放一区二区三区| 日韩精品亚洲专区| 亚洲精品久久久久久久蜜桃臀| 激情欧美日韩| 18岁视频在线观看| 一本综合久久| 粗暴91大变态调教| 精品一区二区免费视频| 亚洲一级Av无码毛片久久精品| 99久久夜色精品国产网站| 久久久久久成人网| 一区二区三区日韩精品| www.国产一区二区| 91精品综合久久久久久| 四虎国产精品永远| 日韩中文在线视频| 高潮在线视频| 久久久久久国产| 污片视频在线免费观看| 久久天天躁日日躁| 激情视频网站在线播放色| 国产精品草莓在线免费观看| 国产精品777777在线播放| 久久久com| 欧美自拍视频| 青青草原国产免费| 久久精品91| 国产精品91av| 国产精品久久久久久久久动漫| 日韩黄色a级片| 91精品麻豆日日躁夜夜躁| 日本韩国一区| 欧美激情视频在线观看| 日韩特级毛片| 国产精品丝袜久久久久久不卡| jizz内谢中国亚洲jizz| 亚洲综合色激情五月| 国产成人精品免费视| 丁香六月激情网| 看电视剧不卡顿的网站| 成人免费看aa片| 亚洲成在人线在线播放| 91免费视频播放| 国产一区二区三区在线看| 成入视频在线观看| 草莓视频一区| 亚洲澳门在线| 国产传媒久久久| 精品亚洲aⅴ乱码一区二区三区| 色欲av无码一区二区三区| 欧美激情在线免费观看| 天天插天天操天天干| 精品国一区二区三区| 国产调教视频在线观看| 国产精品一区二区久久国产| 亚洲资源网站| 中文字幕在线亚洲精品| 日韩精品亚洲专区| 日本性高潮视频| 欧美日韩中文字幕| 日本中文字幕电影在线观看| 97国产在线视频| 精品欧美午夜寂寞影院| 国产精品无码人妻一区二区在线| 国产精品一区二区三区乱码 | 午夜视频在线播放| 久久久久久亚洲| 中文字幕一区二区三区中文字幕 | cao在线视频| 97碰碰视频| 欧美私人啪啪vps| 美女露出粉嫩尿囗让男人桶| 亚洲一级二级三级在线免费观看| www.xxxx国产| 久久久久国产视频| 日韩mv欧美mv国产网站| 色综合久久久久久久久五月| 久久免费国产| 国产黄色录像视频| 欧美日韩情趣电影| 欧美黑人激情| 亚洲自拍小视频| 狠狠综合久久| 一级欧美一级日韩片| 欧美日韩免费网站| 国产日韩精品在线看| 国产精品中文字幕在线观看| 区一区二视频| 日韩不卡的av| 久久精品夜色噜噜亚洲a∨| 国产精品va无码一区二区三区| 亚洲乱码av中文一区二区| 香蕉久久免费电影| 一区二区视频国产| 国产成人免费在线视频| 国产又黄又粗又猛又爽的| 精品久久久久久电影| 国产片在线播放| 久久精品国产v日韩v亚洲| 免费a级毛片在线播放| 成人网欧美在线视频| 91精品导航| 少妇高潮毛片色欲ava片| 久久久久久久久蜜桃| 中文字幕av网站| 久久九九亚洲综合| 韩国女主播一区二区三区| 中文字幕日韩精品一区二区| 国产真实乱偷精品视频免| 国产高潮呻吟久久| 欧美精品久久天天躁| 国产天堂在线| 91社区国产高清| 亚洲精品婷婷| 2017亚洲天堂| 精品蜜桃在线看| 欧美aaa大片视频一二区| 国产精品自拍合集| 久久精品欧美一区二区三区麻豆| 国产老妇伦国产熟女老妇视频| 午夜欧美大片免费观看| 91九色精品| 少妇大叫太粗太大爽一区二区|