精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Shuffle 核心技術(shù)深度解析

大數(shù)據(jù)
本文將深入剖析 Sort-Based Shuffle 的核心原理、Shuffle Manager 的可插拔設(shè)計,以及 Map-side 聚合、Partition Reuse、堆外 Shuffle 等關(guān)鍵優(yōu)化技術(shù),并結(jié)合源碼揭示其實現(xiàn)細(xì)節(jié)。

一、Shuffle 概述:Spark 分布式計算的“心臟”

在 Spark 分布式計算中,Shuffle 是連接 Map 階段和 Reduce 階段的關(guān)鍵橋梁,也是影響作業(yè)性能的核心環(huán)節(jié)。當(dāng)需要對數(shù)據(jù)進(jìn)行重新分區(qū)(如 groupByKey、reduceByKey、join 等操作)時,Spark 必須將不同分區(qū)(Partition)的數(shù)據(jù)進(jìn)行重新分發(fā),這一過程就是 Shuffle。簡單來說,Shuffle 的核心任務(wù)是:將 Map 端的數(shù)據(jù)按規(guī)則分區(qū)、排序后寫入磁盤,再由 Reduce 端拉取并處理。

Shuffle 的性能直接影響整個作業(yè)的執(zhí)行效率。早期 Spark 版本采用 Hash Shuffle,存在嚴(yán)重的“小文件問題”;后續(xù)引入的 Sort-Based Shuffle 通過優(yōu)化文件管理和排序機(jī)制,成為默認(rèn)的 Shuffle 實現(xiàn)。本文將深入剖析 Sort-Based Shuffle 的核心原理、Shuffle Manager 的可插拔設(shè)計,以及 Map-side 聚合、Partition Reuse、堆外 Shuffle 等關(guān)鍵優(yōu)化技術(shù),并結(jié)合源碼揭示其實現(xiàn)細(xì)節(jié)。

二、Shuffle 演進(jìn):從 Hash Shuffle 到 Sort-Based Shuffle

1. Hash Shuffle:早期實現(xiàn)的“痛點”

在 Spark 1.6 之前,Hash Shuffle 是默認(rèn)實現(xiàn)。其核心邏輯是:每個 Map Task 為每個 Reduce Task 創(chuàng)建一個單獨的文件。假設(shè)作業(yè)有 M 個 Map Task 和 R 個 Reduce Task,則會產(chǎn)生 M × R 個文件。例如,1000 個 Map Task 和 1000 個 Reduce Task 會生成 100 萬個文件,這會帶來兩個嚴(yán)重問題:

  • 文件系統(tǒng)壓力:大量小文件會導(dǎo)致文件系統(tǒng)元數(shù)據(jù)管理開銷劇增(如 HDFS 的 NameNode 內(nèi)存壓力),同時隨機(jī)讀寫小文件的效率極低。
  • 內(nèi)存開銷:每個 Map Task 需要同時打開 R 個文件句柄(File Handler),當(dāng) R 較大時,容易導(dǎo)致內(nèi)存溢出或句柄耗盡。

Hash Shuffle 流程示例:

// 偽代碼:Hash Shuffle Map 端寫入
for (record: (K, V) in mapTask.records) {
    int reducePartition = partitioner.getPartition(record._1);
    // 每個reducePartition對應(yīng)一個文件
    FileOutputStream fos = getFileOutputStream(reducePartition);
    fos.write(serialize(record));
}

2. Sort-Based Shuffle:默認(rèn)實現(xiàn)的“優(yōu)化之道”

為解決 Hash Shuffle 的問題,Spark 1.6 后默認(rèn)采用 Sort-Based Shuffle。其核心改進(jìn)是:每個 Map Task 只生成一個數(shù)據(jù)文件和一個索引文件。數(shù)據(jù)文件按 Partition ID 排序存儲,索引文件記錄每個 Partition 的起始位置和長度。Reduce Task 通過索引文件快速定位并拉取屬于自己的數(shù)據(jù)。

Sort-Based Shuffle 的優(yōu)勢:

  • 文件數(shù)量大幅減少:M 個 Map Task 僅生成 2M 個文件(1 數(shù)據(jù)文件 + 1 索引文件),避免小文件問題。
  • 排序優(yōu)化:在 Map 端按 Partition ID 排序(可自定義 Secondary Key 排序),減少 Reduce 端合并開銷。
  • 內(nèi)存管理高效:基于堆外內(nèi)存和排序緩沖區(qū),減少 GC 壓力。

三、Shuffle Manager:可插拔的“調(diào)度中心”

Spark 通過 ShuffleManager 接口實現(xiàn) Shuffle 機(jī)制的可插拔設(shè)計,用戶可通過 spark.shuffle.manager 參數(shù)指定實現(xiàn)類(默認(rèn) sort)。ShuffleManager 的核心職責(zé)包括:

  • 注冊 Shuffle 依賴(registerShuffle);
  • 獲取 Map 端 Writer(getWriter);
  • 獲取 Reduce 端 Reader(getReader)。

1. ShuffleManager 接口定義

// org.apache.spark.shuffle.ShuffleManager
trait ShuffleManager{
  // 注冊Shuffle依賴,返回Shuffle句柄
def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  // 獲取Map端的Writer,用于寫入數(shù)據(jù)
def getWriter[K, V](
      shuffleHandle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V]

  // 獲取Reduce端的Reader,用于拉取數(shù)據(jù)
def getReader[K, C](
      shuffleHandle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]

  // 釋放資源
def stop(): Unit
}

2. 核心實現(xiàn)類:SortShuffleManager 與 HashShuffleManager

(1) SortShuffleManager(默認(rèn)實現(xiàn))

SortShuffleManager 是當(dāng)前主流實現(xiàn),支持三種 Writer 模式:

① UnsafeShuffleWriter:當(dāng)滿足以下條件時啟用(性能最優(yōu)):

  • Shuffle 依賴的序列化器支持 KSerializer 且 key 不需要排序;
  • Shuffle 依賴的聚合器(aggregator)為空;
  • Reduce 分區(qū)數(shù)量不超過 spark.shuffle.sort.maxSpaceUsage(默認(rèn) Long.MaxValue)。

特點:直接操作堆外內(nèi)存,基于 ShuffleExternalSorter 排序,避免 Java 對象開銷。

② SortShuffleWriter:通用模式,當(dāng)不滿足 UnsafeShuffleWriter 條件時啟用:

  • 支持自定義排序(keyOrdering)和 Map-side 聚合(aggregator);
  • 基于 PartitionedPairBuffer(堆內(nèi))或 ShuffleExternalSorter(堆外)排序。

③ BypassMergeSortShuffleWriter:當(dāng)滿足以下條件時啟用(減少排序開銷):

  • Shuffle 依賴的 mapSideCombine 為 false(無 Map-side 聚合);
  • Reduce 分區(qū)數(shù)量小于 spark.shuffle.sort.bypassMergeThreshold(默認(rèn) 200)。

特點:類似 Hash Shuffle,但最后會合并所有 Partition 文件為一個數(shù)據(jù)文件,避免小文件問題。

SortShuffleManager.getWriter 邏輯源碼:

// org.apache.spark.shuffle.sort.SortShuffleManager
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
  shuffleBlockResolver match {
    case resolver: IndexShuffleBlockResolver =>
      val shuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
      // 判斷是否啟用Bypass模式
      if (shuffleHandle.dependency.mapSideCombine) {
        new SortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
      } else if (shuffleHandle.dependency.partitioner.numPartitions <= bypassMergeThreshold) {
        new BypassMergeSortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
      } else {
        // 判斷是否啟用Unsafe模式
        val serializer = shuffleHandle.dependency.serializer
        val ser = serializer.newInstance()
        if (ser.supportsRelocationOfSerializedObjects && 
            !shuffleHandle.dependency.keyOrdering.isDefined) {
          new UnsafeShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
        } else {
          new SortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
        }
      }
  }
}

(2) HashShuffleManager(已廢棄)

HashShuffleManager 是早期實現(xiàn),核心邏輯是每個 Map Task 為每個 Reduce Task 創(chuàng)建單獨文件。由于小文件問題,已在 Spark 3.0 后被移除,但其設(shè)計思想對理解 Shuffle 演進(jìn)仍有意義。

四、Sort-Based Shuffle 核心流程:從 Map 端寫入到 Reduce 端拉取

1. Map 端寫入流程

Sort-Based Shuffle 的 Map 端核心流程分為 數(shù)據(jù)緩沖、排序 spill、合并文件 三個階段,以 SortShuffleWriter 為例:

(1) 數(shù)據(jù)緩沖:PartitionedPairBuffer

Map Task 首先將數(shù)據(jù)寫入內(nèi)存緩沖區(qū) PartitionedPairBuffer,其結(jié)構(gòu)為 數(shù)組 + 鏈表:

  • 數(shù)組存儲 (partitionId, record) 的指針;
  • 按 partitionId 分區(qū),同一分區(qū)內(nèi)記錄按插入順序存儲(后續(xù)可排序)。
// org.apache.spark.util.collection.PartitionedPairBuffer
class PartitionedPairBuffer[K, V](initialCapacity: Int) extends SizeTracker {
  private var buffer = new Array[AnyRef](2 * initialCapacity) // 存儲(partitionId, key, value)
  private var curSize = 0

  def insert(partitionId: Int, key: K, value: V): Unit = {
    if (curSize == buffer.length) {
      growArray() // 擴(kuò)容
    }
    buffer(curSize) = partitionId.asInstanceOf[AnyRef]
    buffer(curSize + 1) = (key, value).asInstanceOf[AnyRef]
    curSize += 2
  }
}

(2) 排序與 Spill:當(dāng)緩沖區(qū)達(dá)到閾值

當(dāng)緩沖區(qū)大小超過 spark.shuffle.spill.numElementsForceSpillThreshold(默認(rèn) Integer.MAX_VALUE)或內(nèi)存不足時,觸發(fā) spill 操作:

  • 排序:按 partitionId 升序排序(若定義了 keyOrdering,則同一分區(qū)內(nèi)按 key 排序);
  • 寫入磁盤:將排序后的數(shù)據(jù)寫入臨時文件,記錄每個 Partition 的偏移量;
  • 釋放內(nèi)存:清空緩沖區(qū),繼續(xù)接收新數(shù)據(jù)。

排序 spill 源碼(ShuffleExternalSorter):

// org.apache.spark.shuffle.ShuffleExternalSorter
def spill(): Unit = {
  // 獲取排序后的迭代器(先按partitionId,再按key)
  val sortedIterator = shuffleMemoryManager.allocateMemoryForSort()
  // 寫入臨時文件
  val file = spillFileCreator.createTempFile()
  val writer = new DiskBlockWriter(file)
  while (sortedIterator.hasNext) {
    val (partitionId, key, value) = sortedIterator.next()
    writer.write(partitionId, key, value)
  }
  writer.close()
  spillFiles += file // 記錄spill文件
}

(3) 合并文件:生成數(shù)據(jù)文件與索引文件

Map Task 結(jié)束前,會將內(nèi)存緩沖區(qū)和所有 spill 文件合并為 一個數(shù)據(jù)文件 和 一個索引文件:

  • 數(shù)據(jù)文件:存儲所有 Partition 的數(shù)據(jù),按 partitionId 順序排列;
  • 索引文件:存儲每個 Partition 在數(shù)據(jù)文件中的 起始位置 和 長度(固定 8 字節(jié)/Partition)。

索引文件結(jié)構(gòu)示例:

Partition 0: offset=0, length=1024
Partition 1: offset=1024, length=2048
Partition 2: offset=3072, length=512
...

合并文件源碼(IndexShuffleBlockResolver):

// org.apache.spark.shuffle.IndexShuffleBlockResolver
def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  // 索引文件路徑:shuffleId-mapId.index
  val indexFile = getIndexFile(shuffleId, mapId)
  // 數(shù)據(jù)文件路徑:shuffleId-mapId.data
  val dataFile = getDataFile(shuffleId, mapId)
  
  // 寫入索引文件(每個Partition 8字節(jié):offset + length)
  val out = new DataOutputStream(new FileOutputStream(indexFile))
  try {
    var offset = 0L
    for (length <- lengths) {
      out.writeLong(offset)
      out.writeLong(length)
      offset += length
    }
  } finally {
    out.close()
  }
  
  // 重命名臨時數(shù)據(jù)文件為正式文件
  dataTmp.renameTo(dataFile)
}

2. Reduce 端拉取流程

Reduce Task 通過 ShuffleReader 拉取 Map 端的數(shù)據(jù),核心流程包括 獲取數(shù)據(jù)位置、拉取數(shù)據(jù)、合并與聚合。

(1) 獲取數(shù)據(jù)位置:MapOutputTracker

Reduce Task 首先通過 MapOutputTracker 獲取每個 Map Task 中對應(yīng) Partition 的數(shù)據(jù)位置(包括 Executor 地址、數(shù)據(jù)文件路徑、索引文件偏移量)。

// org.apache.spark.MapOutputTracker
def getMapSizesByExecutorId(
    shuffleId: Int,
    startPartition: Int,
    endPartition: Int): Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
  // 從Driver獲取MapOutput信息(或本地緩存)
  val statuses = mapStatuses.get(shuffleId).getOrElse(throw ...)
  statuses.map { status =>
    val blockManagerId = status.location
    // 獲取指定Partition的偏移量和長度
    val sizes = status.getSizeForBlockRange(startPartition, endPartition)
    (blockManagerId, sizes)
  }
}

(2) 拉取數(shù)據(jù):BlockStoreShuffleReader

BlockStoreShuffleReader 通過 BlockManager 從遠(yuǎn)程 Executor 拉取數(shù)據(jù)塊,支持 本地讀取(優(yōu)先)和 遠(yuǎn)程傳輸(通過 Netty)。

// org.apache.spark.shuffle.BlockStoreShuffleReader
override def read(): Iterator[Product2[K, C]] = {
  // 獲取數(shù)據(jù)位置
  val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(...)
  // 創(chuàng)建ShuffleBlockFetcherIterator,用于拉取數(shù)據(jù)
  val shuffleBlockFetcherIterator = new ShuffleBlockFetcherIterator(
    context,
    blockManager.blockStoreClient,
    blockManager,
    blocksByAddress,
    serializer,
    // 傳輸配置(如最大并發(fā)拉取數(shù))
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxMbInFlight", "48") * 1024 * 1024)
  
  // 聚合或迭代返回數(shù)據(jù)
  val aggregatedIter = if (dep.aggregator.isDefined) {
    // 如果定義了aggregator,進(jìn)行Reduce端聚合
    new AggregatorIterator(shuffleBlockFetcherIterator, dep.aggregator.get)
  } else {
    shuffleBlockFetcherIterator.map(pair => (pair._1, pair._2))
  }
  
  aggregatedIter
}

(3) 合并與聚合:Reduce 端優(yōu)化

Reduce 端拉取數(shù)據(jù)后,可能需要合并來自多個 Map Task 的數(shù)據(jù),并進(jìn)行聚合(如 reduceByKey)。Spark 通過 AggregatorIterator 實現(xiàn)流式聚合,避免全量數(shù)據(jù)加載到內(nèi)存。

五、Shuffle 核心優(yōu)化技術(shù):從原理到源碼

1. Map-side 聚合:減少數(shù)據(jù)傳輸量的“利器”

(1) 原理:在 Map 端預(yù)聚合

Map-side 聚合是指在 Map Task 將數(shù)據(jù)寫入 Shuffle 前先進(jìn)行局部聚合(如 reduceByKey 的 reduce 操作),減少需要寫入磁盤和傳輸?shù)臄?shù)據(jù)量。例如,統(tǒng)計單詞頻次時,Map 端可先對本地單詞計數(shù),Reduce 端只需合并各 Map Task 的局部結(jié)果。

適用場景:聚合函數(shù)(如 reduce、aggregate)滿足 結(jié)合律 和 交換律(如 sum、max)。

(2) 實現(xiàn)源碼:ShuffleDependency 與 Aggregator

Map-side 聚合的核心是 ShuffleDependency 中的 aggregator 字段,定義了聚合邏輯:

// org.apache.spark.ShuffleDependency
class ShuffleDependency[K, V, C](
    @transient val rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None, // 聚合器
    val mapSideCombine: Boolean = false // 是否啟用Map-side聚合
) extends Dependency[Product2[K, V]] {
  // ...
}

Aggregator 定義了三個核心函數(shù):

  • createCombiner: 將第一個 value 轉(zhuǎn)換為聚合器類型 C(如 word -> 1);
  • mergeValue: 將新 value 合并到聚合器(如 1 + count -> newCount);
  • mergeCombiners: 合并兩個聚合器(如 count1 + count2 -> totalCount)。

Map-side 聚合執(zhí)行流程(SortShuffleWriter):

// org.apache.spark.shuffle.sort.SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 如果定義了aggregator且mapSideCombine=true,啟用Map-side聚合
  val maybeAggregator: Option[Aggregator[K, V, C]] = 
    if (dep.mapSideCombine) dep.aggregator else None

  // 創(chuàng)建排序緩沖區(qū)
  val sorter = if (dep.aggregator.isDefined && dep.mapSideCombine) {
    // 使用聚合器排序
    new PartitionedAppendOnlyMap[K, C](dep.aggregator.get, dep.keyOrdering)
  } else {
    // 普通排序緩沖區(qū)
    new PartitionedPairBuffer[K, V](initialCapacity)
  }

  // 遍歷記錄,插入緩沖區(qū)(聚合或直接存儲)
  for (record <- records) {
    maybeAggregator match {
      case Some(aggregator) =>
        // 調(diào)用aggregator的mergeValue進(jìn)行聚合
        sorter.insert(record._1, aggregator.mergeValue(record._2))
      case None =>
        sorter.insert(record._1, record._2)
    }
  }

  // 排序、spill、合并文件(前文流程)
  // ...
}

(3) 性能收益

假設(shè)原始數(shù)據(jù)為 ("a", 1), ("b", 1), ("a", 1),無 Map-side 聚合時需傳輸 3 條記錄;啟用后,Map 端聚合為 ("a", 2), ("b", 1),僅傳輸 2 條記錄,數(shù)據(jù)量減少 33%。對于數(shù)據(jù)傾斜場景(如某個 key 出現(xiàn)百萬次),Map-side 聚合可大幅降低 Shuffle 數(shù)據(jù)量。

2. Partition Reuse:避免重復(fù)創(chuàng)建 Partition 文件

(1) 原理:復(fù)用數(shù)據(jù)文件與索引文件

Sort-Based Shuffle 中,每個 Map Task 僅生成一個數(shù)據(jù)文件和一個索引文件,所有 Partition 的數(shù)據(jù)存儲在同一文件中,通過索引文件定位。這與 Hash Shuffle 中“每個 Partition 一個文件”的設(shè)計形成對比,徹底避免了小文件問題。

Partition Reuse 的核心:

  • 數(shù)據(jù)文件復(fù)用:不同 Partition 的數(shù)據(jù)按順序?qū)懭胪晃募瑹o額外文件創(chuàng)建開銷;
  • 索引文件高效定位:索引文件固定 8 字節(jié)/Partition,Reduce Task 通過 partitionId 快速計算偏移量(offset = partitionId * 8),讀取起始位置和長度。

(2) 實現(xiàn)源碼:IndexShuffleBlockResolver

IndexShuffleBlockResolver 負(fù)責(zé)管理 Shuffle 文件的創(chuàng)建和讀取,核心方法包括:

  • getDataFile: 獲取數(shù)據(jù)文件路徑(shuffleId-mapId.data);
  • getIndexFile: 獲取索引文件路徑(shuffleId-mapId.index);
  • getBlockData: 根據(jù) partitionId 讀取數(shù)據(jù)文件的對應(yīng)片段。
// org.apache.spark.shuffle.IndexShuffleBlockResolver
def getBlockData(
    shuffleId: Int,
    mapId: Int,
    reduceId: Int): ManagedBuffer = {
  // 索引文件路徑
  val indexFile = getIndexFile(shuffleId, mapId)
  // 數(shù)據(jù)文件路徑
  val dataFile = getDataFile(shuffleId, mapId)
  
  // 讀取索引文件,獲取reduceId對應(yīng)Partition的偏移量和長度
  val in = new DataInputStream(new FileInputStream(indexFile))
  try {
    // 跳轉(zhuǎn)到reduceId對應(yīng)的索引位置(每個Partition 8字節(jié))
    in.skipBytes(reduceId * 8)
    val offset = in.readLong()
    val length = in.readLong()
    // 返回數(shù)據(jù)文件的對應(yīng)片段(FileSegmentManagedBuffer)
    new FileSegmentManagedBuffer(dataFile, offset, length)
  } finally {
    in.close()
  }
}

(3) 性能收益

  • 文件數(shù)量減少:M 個 Map Task 和 R 個 Reduce Task 下,文件數(shù)量從 M×R(Hash Shuffle)降至 2M(Sort-Based Shuffle)。例如 1000 Map Task 和 1000 Reduce Task,文件數(shù)量從 100 萬降至 2000,減少 99.8%。
  • IO 效率提升:順序讀寫大文件比隨機(jī)讀寫小文件效率高 1~2 個數(shù)量級(HDFS 等文件系統(tǒng)對順序讀寫優(yōu)化更好)。

3. 堆外 Shuffle:減少 GC 壓力的“內(nèi)存優(yōu)化”

(1) 原理:使用堆外內(nèi)存存儲 Shuffle 數(shù)據(jù)

JVM 堆內(nèi)內(nèi)存(Heap Memory)由 GC 管理,頻繁創(chuàng)建/銷毀 Shuffle 數(shù)據(jù)對象(如 (K, V) 記錄)會導(dǎo)致 GC 頻繁觸發(fā),影響作業(yè)穩(wěn)定性。堆外 Shuffle(Off-Heap Shuffle)通過 直接操作系統(tǒng)內(nèi)存(不受 GC 管理)存儲 Shuffle 數(shù)據(jù),減少 GC 壓力。

堆外內(nèi)存管理:

  • Spark 通過 TaskMemoryManager 分配堆外內(nèi)存,基于 sun.misc.Unsafe 直接操作內(nèi)存;
  • 堆外內(nèi)存大小由 spark.memory.offHeap.size 配置(默認(rèn) 0,不啟用),需設(shè)置 spark.memory.offHeap.enabled=true。

(2) 實現(xiàn)源碼:ShuffleExternalSorter 與 MemoryBlock

ShuffleExternalSorter 是堆外 Shuffle 的核心排序器,使用 MemoryBlock(堆外內(nèi)存塊)存儲數(shù)據(jù):

// org.apache.spark.shuffle.ShuffleExternalSorter
class ShuffleExternalSorter(
    memoryManager: TaskMemoryManager,
    serializerManager: SerializerManager,
    // 堆外內(nèi)存分配器
    initialSize: Long = 1024 * 1024) extends Spillable{

  // 當(dāng)前使用的堆外內(nèi)存塊
  private var currentPage: MemoryBlock = _
  // 當(dāng)前頁的寫入位置
  private var pageCursor: Long = _

  // 分配堆外內(nèi)存頁
  privatedef allocatePage(): Unit = {
    currentPage = memoryManager.allocatePage(PAGE_SIZE)
    pageCursor = 0
  }

  // 插入記錄(寫入堆外內(nèi)存)
def insertRecord(partitionId: Int, key: Long, value: Long): Unit = {
    // 序列化key和value(假設(shè)為Long類型)
    val recordSize = 8 + 8 + 4 // partitionId(4) + key(8) + value(8)
    if (pageCursor + recordSize > currentPage.size) {
      spill() // 當(dāng)前頁空間不足,觸發(fā)spill
      allocatePage() // 分配新頁
    }
    // 寫入堆外內(nèi)存(Unsafe操作)
    val baseObject = currentPage.getBaseObject
    Platform.putLong(baseObject, pageCursor, partitionId)
    Platform.putLong(baseObject, pageCursor + 4, key)
    Platform.putLong(baseObject, pageCursor + 12, value)
    pageCursor += recordSize
  }
}

(3) 關(guān)鍵參數(shù):spark.shuffle.spill.numElementsForceSpillThreshold

該參數(shù)控制 堆外內(nèi)存中元素數(shù)量達(dá)到閾值時強(qiáng)制 spill,避免內(nèi)存中數(shù)據(jù)過多導(dǎo)致 OOM。默認(rèn)值為 Integer.MAX_VALUE(不觸發(fā)),可根據(jù)作業(yè)特點調(diào)整(如數(shù)據(jù)傾斜場景可適當(dāng)降低)。

強(qiáng)制 spill 觸發(fā)邏輯(Spillable 接口):

// org.apache.spark.memory.Spillable
def maybeSpill(collection: collection.Iterable[_], currentMemory: Long): Unit = {
  if (currentMemory > myMemoryThreshold || 
      collection.size > numElementsForceSpillThreshold) {
    spill() // 執(zhí)行spill
    _memoryUsed = 0 // 重置內(nèi)存使用
  }
}

(4) 性能收益

  • 減少 GC 暫停:堆外內(nèi)存不受 GC 管理,避免 Full GC 導(dǎo)致的作業(yè)卡頓(尤其對于大內(nèi)存 Executor,如 64GB+);
  • 內(nèi)存利用率提升:堆外內(nèi)存可避免 JVM 對象頭開銷(12 字節(jié)/對象),存儲相同數(shù)據(jù)占用的內(nèi)存更少;
  • 穩(wěn)定性增強(qiáng):通過 numElementsForceSpillThreshold 控制 spill 閾值,避免因內(nèi)存突增導(dǎo)致的 OOM。

六、總結(jié):Spark Shuffle 的設(shè)計哲學(xué)與未來方向

1. 核心設(shè)計哲學(xué)

Spark Shuffle 的演進(jìn)體現(xiàn)了 “性能優(yōu)化”與“工程實踐”的平衡:

  • 從 Hash 到 Sort:通過文件合并解決小文件問題,兼顧排序需求;
  • 可插拔架構(gòu):ShuffleManager 接口支持靈活擴(kuò)展,適應(yīng)不同場景(如 Push-based Shuffle);
  • 內(nèi)存管理優(yōu)化:堆外內(nèi)存、spill 機(jī)制等設(shè)計,平衡內(nèi)存使用與計算效率;
  • 端到端優(yōu)化:Map-side 聚合、Partition Reuse 等技術(shù),從數(shù)據(jù)生成、傳輸?shù)教幚砣溌穬?yōu)化。

2. 未來方向

  • Push-based Shuffle(Spark 3.0 引入):由 Map Task 主動推送數(shù)據(jù)到 Reduce 端的 Executor,減少 Reduce 端拉取延遲,尤其適用于大規(guī)模集群;
  • GPU 加速 Shuffle:利用 GPU 的高帶寬內(nèi)存和并行計算能力,加速排序、聚合等操作;
  • 動態(tài) Shuffle 調(diào)優(yōu):基于作業(yè)歷史數(shù)據(jù)自動調(diào)整 Shuffle 參數(shù)(如 bypassMergeThreshold、numElementsForceSpillThreshold),減少人工調(diào)優(yōu)成本。

3. 最佳實踐建議

  • 優(yōu)先啟用 Sort-Based Shuffle:默認(rèn)已啟用,無需額外配置;
  • 合理配置 Map-side 聚合:對滿足結(jié)合律的聚合操作(如 reduceByKey),設(shè)置 mapSideCombine=true;
  • 啟用堆外內(nèi)存:對于大內(nèi)存 Executor(如 >32GB),設(shè)置 spark.memory.offHeap.enabled=true 和 spark.memory.offHeap.size(如 10g);
  • 調(diào)整 spill 閾值:數(shù)據(jù)傾斜場景下,適當(dāng)降低 spark.shuffle.spill.numElementsForceSpillThreshold(如 1000000),避免內(nèi)存溢出。
責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2018-03-21 11:05:26

Spark大數(shù)據(jù)應(yīng)用程序

2009-02-26 10:11:00

寬帶路由器網(wǎng)絡(luò)共享

2021-08-11 06:57:16

ShuffleSpark核心

2022-03-15 08:25:32

SparkShuffle框架

2022-05-07 14:31:46

物聯(lián)網(wǎng)

2023-12-05 07:26:29

指標(biāo)中臺大數(shù)據(jù)

2010-08-19 09:20:24

寬帶路由器

2017-05-14 14:41:20

5G波束基站

2022-05-09 08:21:29

Spring微服務(wù)Sentinel

2009-06-15 17:54:50

Java核心技術(shù)

2009-06-26 16:01:39

EJB組織開發(fā)EJB容器EJB

2016-11-15 14:33:05

Flink大數(shù)據(jù)

2023-06-14 08:49:22

PodKubernetes

2017-03-08 10:06:11

Java技術(shù)點注解

2019-01-11 08:27:06

2025-06-13 08:01:34

2018-05-16 11:05:49

ApacheFlink數(shù)據(jù)流

2019-05-15 08:40:34

工業(yè)物聯(lián)網(wǎng)MQTT物聯(lián)網(wǎng)

2022-10-11 08:37:43

Servlet配置版本

2019-03-05 14:57:21

大數(shù)據(jù)Hadoop框架
點贊
收藏

51CTO技術(shù)棧公眾號

伊人久久大香线蕉综合网站| 曰本三级在线| 麻豆成人91精品二区三区| 亚洲精品自在久久| 成人性生活视频免费看| 精品视频三区| 国产精品1区2区| 日本久久精品视频| 欧美色图亚洲视频| 亚洲最大在线| 日韩一二在线观看| 免费日韩中文字幕| 日本无删减在线| 国产欧美一区视频| 国产欧美日韩综合精品二区| 久久久久久久久久一级| 国内揄拍国内精品久久| 国产一区二区三区在线看| 麻豆精品国产传媒| 粉嫩91精品久久久久久久99蜜桃| 亚洲色图欧洲色图| 久久亚洲午夜电影| 国产熟女一区二区三区四区| 老鸭窝亚洲一区二区三区| 免费不卡在线观看av| 国产传媒国产传媒| 精品欠久久久中文字幕加勒比| 欧美日韩一区二区三区免费看| 亚洲激情免费视频| 在线免费观看黄色av| 久久综合久久综合久久| 国产98在线|日韩| 亚洲字幕av一区二区三区四区| 一区二区三区四区五区在线| 欧美大片网站在线观看| 91n在线视频| 精品一区二区三区的国产在线观看| 亚洲国产高清自拍| 真实乱偷全部视频| 日韩高清二区| 91精品国产综合久久婷婷香蕉 | 国精产品一区一区三区四川| 偷拍亚洲欧洲综合| 拔插拔插海外华人免费| 在线网址91| 亚洲人成伊人成综合网小说| 在线一区高清| 欧美尤物美女在线| 国产精品久久久久久久岛一牛影视 | 99精品欧美一区二区三区| 伊人成年综合网| 久热精品在线| 国产精品扒开腿做爽爽爽男男| 精品无码免费视频| 亚洲先锋成人| 国内精品免费午夜毛片| 日韩 欧美 亚洲| 亚洲电影在线| 97在线视频一区| 日本在线播放视频| 美女日韩在线中文字幕| 日本精品中文字幕| 中文字幕人妻丝袜乱一区三区| 视频一区二区中文字幕| 国产精品久久久久91| 中文字幕一区二区在线视频| 毛片av一区二区三区| 国产自产女人91一区在线观看| 91肉色超薄丝袜脚交一区二区| 韩国欧美国产1区| 91久久精品www人人做人人爽| 成人av手机在线| www.久久久久久久久| 黄色99视频| 国产香蕉视频在线看| 国产精品嫩草影院com| 色一情一乱一乱一区91| a毛片不卡免费看片| 日韩欧美极品在线观看| 亚洲不卡视频在线| 国模大尺度视频一区二区| 日韩精品一区二区三区蜜臀| 亚洲一区二区三区综合| 第一会所亚洲原创| 欧美老女人xx| 一区二区三区在线观看av| 久久99久久精品| 国产伦精品一区二区三区四区视频 | 成av人电影在线观看| 国产精品久久久一本精品| 影音先锋欧美在线| 538视频在线| 欧美午夜精品一区二区蜜桃| ass极品水嫩小美女ass| 天天躁日日躁狠狠躁欧美| 一区二区三区四区视频| 草视频在线观看| 六月丁香综合| av一区二区三区免费| 九九九伊在人线综合| 依依成人精品视频| 青青草精品视频在线观看| silk一区二区三区精品视频| 国产小视频91| 日产欧产va高清| 国产一区二区三区黄视频| 免费看成人午夜电影| 91小视频xxxx网站在线| 91福利在线观看| 成人欧美精品一区二区| 欧美日韩中文一区二区| 亚洲91av视频| 亚洲av无码一区二区三区dv| 国产精品三级视频| 黄色影院一级片| 中文字幕一区日韩精品| 日韩中文有码在线视频| 国产又黄又爽又色| 国产成人精品亚洲777人妖| 亚洲不卡1区| 1234区中文字幕在线观看| 欧美精品tushy高清| 熟女俱乐部一区二区视频在线| 狠狠色丁香久久综合频道| 成人福利网站在线观看11| 黄色软件在线| 精品成人在线视频| 成人做爰69片免费| 欧美阿v一级看视频| 国产日韩精品入口| 日韩在线免费看| 婷婷综合五月天| 成人一区二区三区仙踪林| 日韩电影二区| 国产精品旅馆在线| 国产片在线观看| 一本久久a久久精品亚洲| 在线xxxxx| 激情偷拍久久| 国产精品二区在线| 先锋成人av| 欧美不卡一二三| 免费一级片在线观看| 国产乱码字幕精品高清av| 日本福利视频导航| 成人97精品毛片免费看| 久久在线视频在线| 国产精品无码久久av| 最新国产成人在线观看| 久久6免费视频| 亚洲成人精品| 99re视频在线| 国产一线二线在线观看| 亚洲国产91精品在线观看| 日韩精品一区二区av| 97se亚洲国产综合自在线观| 男人日女人bb视频| 经典一区二区| 国产欧美va欧美va香蕉在线| 麻豆影院在线观看| 日韩你懂的在线观看| 国产在线视频二区| 91免费国产视频网站| 成人性视频欧美一区二区三区| 精品国产视频| 91精品在线一区| 成人在线免费观看黄色| 日韩电影视频免费| 高潮毛片又色又爽免费| 国产精品久久久久精k8| 苍井空张开腿实干12次| 香蕉久久夜色精品国产| 亚洲精品乱码久久久久久蜜桃91| 亚洲精品tv| 久久免费成人精品视频| 极品白浆推特女神在线观看| 欧美日韩视频一区二区| 五月婷婷一区二区| 91色在线porny| 中日韩av在线播放| 一区在线观看| 日本一区二区久久精品| 白嫩亚洲一区二区三区| 孩xxxx性bbbb欧美| 成人在线二区| 精品国一区二区三区| 久久久精品视频网站| 亚洲情趣在线观看| 中国黄色a级片| 国内精品久久久久影院薰衣草 | 欧美大片高清| 久久精品美女视频网站 | 国产亚洲欧洲高清| 国产成人麻豆精品午夜在线| 大伊人狠狠躁夜夜躁av一区| 在线观看天堂av| 成人短视频下载| 午夜激情av在线| 免费日韩视频| 日本国产中文字幕| 成人羞羞动漫| 久久久久久久有限公司| 精品一区91| 国产精品成人观看视频国产奇米| 国产蜜臀av在线播放| 尤物yw午夜国产精品视频明星| 亚洲成人黄色片| 欧美日韩国产经典色站一区二区三区 | 看片一区二区| 7m精品福利视频导航| 91网在线播放| 亚洲免费一级电影| 好吊色一区二区| 91超碰这里只有精品国产| 中文字幕在线欧美| 亚洲成人激情av| 人妻少妇精品一区二区三区| 欧美激情在线一区二区三区| 国产精品无码电影| 国产精品亚洲专一区二区三区| www.欧美日本| 久久精品五月| av7777777| 亚洲国产激情| 福利在线一区二区| 午夜久久美女| 中文字幕色呦呦| 欧美电影免费| 亚洲欧洲三级| 欧美一区三区| 色中色综合成人| 久久99国产精品视频| 精品欧美日韩| 欧美一级全黄| 久久超碰亚洲| 亚洲欧洲免费| 日本欧美色综合网站免费| 亚洲国产合集| 蜜桃麻豆91| 欧美日韩xxxx| 日韩国产美国| 欧美天天综合| 亚洲精品成人自拍| 久久国产亚洲精品| 一区二区三区我不卡| 日韩精品久久| 中文精品一区二区三区| 亚洲乱码电影| 性生活免费观看视频| 欧美a级在线| 亚洲国产精品无码观看久久| 亚洲精品三级| 日韩精品一区二区三区不卡| 久久综合图片| 自拍偷拍一区二区三区四区| 另类人妖一区二区av| 日韩视频在线观看一区二区三区| 国内精品久久久久影院薰衣草| 一级淫片在线观看| 国产盗摄精品一区二区三区在线 | 欧美黄色三级网站| 92久久精品| 欧美壮男野外gaytube| 午夜无码国产理论在线| 国产一区二区视频在线观看| 婷婷久久免费视频| 成人情视频高清免费观看电影| 国产精品三p一区二区| 农村寡妇一区二区三区| 欧美综合一区| 中文字幕色呦呦| 国产欧美午夜| 青青草精品视频在线观看| 国产综合久久久久影院| 大桥未久恸哭の女教师| 国产无遮挡一区二区三区毛片日本| 日本欧美一区二区三区不卡视频| 中文字幕一区二区三区乱码在线| 欧美片一区二区| 黑人精品xxx一区一二区| 一区二区三区麻豆| 欧美一级二级三级蜜桃| 三级在线播放| 久久综合免费视频影院| 国产高潮在线| 国产欧美久久一区二区| 国产suv精品一区二区四区视频| 欧美不卡三区| 欧美在线黄色| 日韩毛片在线免费看| 国产精品一区二区久激情瑜伽| 精品久久久久久中文字幕人妻最新| 国产精品午夜久久| 久草精品视频在线观看| 欧美日韩免费不卡视频一区二区三区| 性中国古装videossex| 国产午夜精品全部视频在线播放 | 欧美美女色图| 美女少妇精品视频| av有声小说一区二区三区| av噜噜色噜噜久久| 91亚洲自偷观看高清| 香港三级韩国三级日本三级| 国产在线观看免费一区| 极品人妻videosss人妻| 亚洲一区二区视频| 夜夜爽8888| 亚洲天堂免费在线| 岛国在线视频网站| 亚洲自拍小视频免费观看| 欧美少妇xxxx| 久久久久久久少妇| 97久久超碰精品国产| 久久国产精品二区| 欧美高清视频不卡网| 激情视频在线观看免费| 97国产suv精品一区二区62| 丁香婷婷久久| 欧美中文娱乐网| 99精品免费视频| 动漫av在线免费观看| 1000精品久久久久久久久| 91视频在线视频| 国产视频欧美视频| 国产99在线| 国产乱码精品一区二区三区中文 | 精品中文字幕在线2019| 亚洲精品69| 伊人久久大香线蕉综合75| 久久天堂成人| 四虎国产精品成人免费入口| 大荫蒂欧美视频另类xxxx| 先锋av资源站| 97免费视频在线播放| 香蕉免费一区二区三区在线观看 | 国产视频三区四区| 色综合久久六月婷婷中文字幕| 秋霞av鲁丝片一区二区| 欧美高跟鞋交xxxxxhd| 日韩精品中文字幕一区二区| 超碰10000| 懂色av一区二区在线播放| 欧美精品久久久久性色| 日韩一区二区免费在线观看| 高清全集视频免费在线| 亚洲最大福利视频| 欧美fxxxxxx另类| 95视频在线观看| 亚洲国产成人91porn| 日本成人动漫在线观看| 国内精品久久久久伊人av| 久久资源综合| 成人黄色片视频| 国产人久久人人人人爽| 一级一级黄色片| 久久精品电影网| 亚洲精品福利| 日本www在线视频| 久久网这里都是精品| 中文字幕免费视频观看| 色噜噜国产精品视频一区二区| 日韩一区二区三区四区五区 | 蜜桃麻豆91| 免费日本视频一区| 中文字幕在线2021| 欧美xxx久久| 波多野结衣亚洲一二三| 亚洲v国产v| 国产成人精品影视| 在线观看日韩中文字幕| 在线午夜精品自拍| 亚洲精品毛片| 野外做受又硬又粗又大视频√| 26uuu国产日韩综合| 中文字幕av资源| 欧美激情一区二区三级高清视频| 性欧美lx╳lx╳| 日本高清一区二区视频| 午夜日韩在线观看| 91电影在线播放| 国产精品乱码| 日韩高清在线观看| 男女免费视频网站| 一级做a爰片久久毛片美女图片| 国产精品久久久久久av公交车| 欧美爱爱视频免费看| 国产精品无遮挡| 人妻少妇精品无码专区| 国产女精品视频网站免费| 最新亚洲一区| 成人一级黄色大片| 日韩电影中文字幕| 亚洲一区av| 美女福利视频在线| 亚洲激情一二三区| av男人的天堂在线| 精品国产一区二区三区久久久久久| 免费在线视频一区| 在线能看的av| 欧美福利视频网站|