精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink 異步 Checkpoint 機制詳解

大數(shù)據(jù)
在流處理場景中,數(shù)據(jù)源源不斷流入,系統(tǒng)可能因節(jié)點故障、網(wǎng)絡問題等導致任務中斷,若無有效的容錯機制,數(shù)據(jù)可能丟失或重復處理。

一、引言:Flink容錯機制與Checkpoint的核心作用

Apache Flink作為分布式流處理引擎,其核心優(yōu)勢之一是“Exactly-Once”(精確一次)的容錯保證。在流處理場景中,數(shù)據(jù)源源不斷流入,系統(tǒng)可能因節(jié)點故障、網(wǎng)絡問題等導致任務中斷,若無有效的容錯機制,數(shù)據(jù)可能丟失或重復處理。Flink通過Checkpoint機制實現(xiàn)容錯:定期為作業(yè)狀態(tài)(State)創(chuàng)建快照(Snapshot),并將快照持久化到可靠存儲(如HDFS、S3)。當任務失敗時,從最近一次成功的Checkpoint快照恢復狀態(tài),確保數(shù)據(jù)處理的連續(xù)性和一致性。

1. 同步Checkpoint的瓶頸

早期的Checkpoint機制多為同步模式:在觸發(fā)Checkpoint時,任務暫停數(shù)據(jù)處理,等待所有狀態(tài)快照完成并持久化到遠程存儲后,再繼續(xù)處理數(shù)據(jù)。這種模式實現(xiàn)簡單,但存在明顯缺陷:

  • 處理延遲增加:狀態(tài)快照和持久化過程可能耗時較長(尤其狀態(tài)量大時),導致數(shù)據(jù)處理暫停,延遲飆升。
  • 吞吐量下降:頻繁的Checkpoint會占用大量處理時間,降低整體吞吐。

2. 異步Checkpoint的誕生

為解決同步Checkpoint的性能問題,F(xiàn)link引入了異步Checkpoint機制。其核心思想是:將狀態(tài)快照的生成與持久化操作從主數(shù)據(jù)處理流程中剝離,主線程僅負責快照的“準備”工作(如觸發(fā)狀態(tài)快照、生成快照元數(shù)據(jù)),而耗時的“持久化”操作(如將快照數(shù)據(jù)寫入遠程存儲)交由獨立線程池異步執(zhí)行。這樣,主數(shù)據(jù)處理流程幾乎不被阻塞,實現(xiàn)“低延遲”與“高吞吐”的平衡。

二、異步Checkpoint的核心原理與設計目標

1. 異步Checkpoint的定義

異步Checkpoint是指:在Checkpoint觸發(fā)過程中,任務(Task)的主數(shù)據(jù)處理線程不等待狀態(tài)快照完全持久化到遠程存儲,而是快速生成快照“句柄”(如文件句柄、內存指針)后立即恢復數(shù)據(jù)處理,快照的持久化操作由后臺線程異步完成。當所有任務的快照句柄生成并持久化完成后,Checkpoint才被標記為“成功”。

2. 核心設計目標

異步Checkpoint的設計需滿足以下目標:

  • 低延遲:主數(shù)據(jù)處理線程阻塞時間盡可能短(毫秒級),避免Checkpoint對正常數(shù)據(jù)處理延遲的影響。
  • 高吞吐:異步持久化不占用主線程資源,確保數(shù)據(jù)處理能力不受Checkpoint頻率影響。
  • 一致性:即使異步持久化失敗,也能保證Checkpoint的“原子性”——要么所有任務快照成功,要么全部失敗,避免狀態(tài)不一致。
  • 可恢復性:快照數(shù)據(jù)需完整、可靠地存儲到遠程存儲,故障恢復時可正確加載。

三、異步Checkpoint的核心架構與組件

異步Checkpoint的實現(xiàn)涉及Flink作業(yè)的多個核心組件,整體架構如下圖所示(簡化版):

┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│  JobManager │       │  TaskManager │       │ Remote Storage│
│ (Checkpoint│       │ (Task,       │       │ (HDFS/S3)    │
│  Coordinator)│       │  StateBackend)│       │              │
└──────┬──────┘       └───────┬──────┘       └───────┬──────┘
       │                      │                      │
       │ 1. Trigger Checkpoint│                      │
       │─────────────────────>│                      │
       │                      │ 2. Inject Barrier     │
       │                      │─────────────────────>│
       │                      │ 3. Async Snapshot     │
       │                      │ (主線程生成快照句柄)    │
       │                      │                      │
       │ 4. Acknowledge       │ 5. Async Persist      │
       │<─────────────────────│ (后臺線程持久化)       │
       │                      │─────────────────────>│
       │ 6. Complete Checkpoint│                      │
       │ (所有Ack收到)         │                      │
       │                      │                      │

1. 核心組件角色

(1) JobManager:CheckpointCoordinator

角色:Checkpoint的“總指揮”,負責觸發(fā)、協(xié)調、監(jiān)控整個作業(yè)的Checkpoint流程。

核心職責:

  • 定期觸發(fā)Checkpoint(基于時間間隔或手動觸發(fā))。
  • 向所有Task發(fā)送Checkpoint觸發(fā)請求(攜帶CheckpointID)。
  • 接收各Task的Checkpoint Ack(確認)或 Nack(失敗)消息。
  • 當所有Task均Ack時,標記Checkpoint為“成功”,并清理舊Checkpoint;若收到Nack,標記為“失敗”。

(2) TaskManager:Task與StateBackend

Task:作業(yè)的基本執(zhí)行單元,包含一個或多個算子(Operator)。

每個Task負責:

  • 接收JobManager的Checkpoint觸發(fā)請求。
  • 在數(shù)據(jù)流中注入Checkpoint Barrier(特殊數(shù)據(jù)事件,標記Checkpoint的起始位置)。
  • 協(xié)調算子進行狀態(tài)快照(通過StateBackend)。
  • 向JobManager上報Checkpoint結果(Ack/Nack)。

StateBackend:狀態(tài)存儲的后端實現(xiàn),負責狀態(tài)的快照與恢復。異步Checkpoint的核心實現(xiàn)依賴StateBackend的異步能力:

  • MemoryStateBackend:狀態(tài)存儲在TaskManager內存,快照時同步序列化到JobManager內存(僅適用于測試,不支持異步)。
  • FsStateBackend:狀態(tài)存儲在本地文件系統(tǒng),快照時異步將本地文件上傳到遠程存儲(如HDFS)。
  • RocksDBStateBackend:狀態(tài)存儲在本地RocksDB,快照時異步生成RocksDB快照并上傳到遠程存儲(生產環(huán)境常用,支持大狀態(tài)異步快照)。

(3) Remote Storage:可靠存儲系統(tǒng)

  • 角色:持久化存儲Checkpoint快照數(shù)據(jù)(如HDFS、S3、Oss等)。
  • 要求:高可靠、高持久化,確保快照數(shù)據(jù)不丟失。

(4) Checkpoint Barrier:數(shù)據(jù)流的“同步信號”

  • 定義:一種特殊的數(shù)據(jù)事件,由Source算子注入,隨數(shù)據(jù)流流向下游算子。
  • 作用:標記Checkpoint的“邊界”——Barrier之前的數(shù)據(jù)屬于當前Checkpoint,Barrier之后的數(shù)據(jù)屬于下一個Checkpoint。
  • 對齊機制:下游算子收到多輸入流的Barrier時,需等待所有輸入流的Barrier到達(稱為“Barrier對齊”),確保快照包含“一致性的狀態(tài)”(即所有輸入流在Barrier之前的數(shù)據(jù)均已處理)。對齊完成后,才觸發(fā)狀態(tài)快照。

四、異步Checkpoint詳細流程與源碼剖析

異步Checkpoint的完整流程可分為6個階段,下面結合Flink 1.18源碼詳細剖析每個階段的實現(xiàn)。

1. 階段1:JobManager觸發(fā)Checkpoint

(1) 觸發(fā)條件

Checkpoint的觸發(fā)分為兩類:

  • 周期性觸發(fā):基于execution.checkpointing.interval配置(如1分鐘),由CheckpointCoordinator的定時任務觸發(fā)。
  • 手動觸發(fā):通過Rest API或StreamExecutionEnvironment.executeCheckpoint()手動觸發(fā)。

(2) 核心流程與源碼

CheckpointCoordinator的triggerCheckpoint()方法是觸發(fā)Checkpoint的入口,核心邏輯如下:

// org.apache.flink.runtime.checkpoint.CheckpointCoordinator
publicvoidtriggerCheckpoint(boolean isPeriodic) {
    // 1. 檢查是否允許觸發(fā)Checkpoint(如作業(yè)狀態(tài)、并發(fā)Checkpoint限制等)
    if (!canTriggerCheckpoint()) {
        return;
    }

    // 2. 生成CheckpointID(全局唯一,遞增)
    longcheckpointID= checkpointIdCounter.getAndIncrement();

    // 3. 創(chuàng)建PendingCheckpoint(記錄Checkpoint的元數(shù)據(jù),如觸發(fā)時間、參與Task等)
    PendingCheckpointpendingCheckpoint=newPendingCheckpoint(
        job,
        checkpointID,
        getTimestamp(),
        getCheckpointStorageLocation(checkpointID),
        tasksToTrigger, // 需要觸發(fā)Checkpoint的所有Task
        getCheckpointConfiguration());

    // 4. 將PendingCheckpoint加入待處理隊列
    pendingCheckpoints.put(checkpointID, pendingCheckpoint);

    // 5. 向所有Task發(fā)送Checkpoint觸發(fā)請求(通過RpcTaskManagerGateway)
    for (ExecutionVertex task : tasksToTrigger) {
        task.getCurrentExecutionAttempt().triggerCheckpointAtSource(
            checkpointID,
            getTimestamp(),
            checkpointOptions);
    }
}

關鍵點解析:

  • CheckpointID:全局唯一標識,用于區(qū)分不同Checkpoint,恢復時通過ID加載對應快照。
  • PendingCheckpoint:記錄Checkpoint的“中間狀態(tài)”,包含參與Task、觸發(fā)時間、存儲位置等。當所有Task Ack后,PendingCheckpoint轉為CompletedCheckpoint。
  • 觸發(fā)請求發(fā)送:通過RpcTaskManagerGateway向TaskManager的Task發(fā)送TriggerCheckpoint消息,攜帶CheckpointID、時間戳等。

2. 階段2:Task注入Checkpoint Barrier

(1) Barrier的作用與注入時機

Barrier是Checkpoint的“同步信號”,由Source算子注入,隨數(shù)據(jù)流流向下游。其核心作用是:

  • 分割數(shù)據(jù)流:Barrier之前的數(shù)據(jù)屬于“當前Checkpoint”,Barrier之后的數(shù)據(jù)屬于“下一個Checkpoint”。
  • 觸發(fā)對齊:下游算子需等待所有輸入流的Barrier到達,確保狀態(tài)快照的“一致性”。

(2) 核心流程與源碼

當Task收到JobManager的TriggerCheckpoint消息后,由StreamTask(流任務基類)處理,核心邏輯如下:

// org.apache.flink.streaming.runtime.tasks.StreamTask
publicvoidtriggerCheckpointAsync(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions) {

    // 1. 異步觸發(fā)Checkpoint(避免阻塞主線程)
    mailboxProcessor.getMainMailboxExecutor().execute(
        () -> triggerCheckpoint(checkpointMetaData, checkpointOptions),
        "Trigger Checkpoint");
}

privatevoidtriggerCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions) {

    // 2. 檢查是否允許觸發(fā)Checkpoint(如Task狀態(tài)、Barrier對齊狀態(tài)等)
    if (!isRunning) {
        return;
    }

    // 3. 向所有輸出流注入Checkpoint Barrier
    for (RecordWriterOutput<?> output : getRecordWriterOutputs()) {
        output.emitWatermark(newCheckpointBarrier(
            checkpointMetaData.getCheckpointId(),
            checkpointMetaData.getTimestamp(),
            checkpointOptions));
    }

    // 4. 觸發(fā)本Task的狀態(tài)快照(見階段3)
    checkpointStateManager.triggerCheckpoint(
        checkpointMetaData,
        checkpointOptions,
        newCheckpointMetrics());
}

關鍵點解析:

  • 異步觸發(fā):通過mailboxProcessor將Checkpoint觸發(fā)任務提交到主線程的郵箱隊列,避免阻塞IO線程(Netty線程)。
  • Barrier注入:RecordWriterOutput向每個輸出流寫入CheckpointBarrier事件。Barrier隨數(shù)據(jù)流動,下游算子通過InputChannel接收。
  • 觸發(fā)狀態(tài)快照:注入Barrier后,立即調用checkpointStateManager.triggerCheckpoint()啟動本Task的狀態(tài)快照流程。

3. 階段3:Task異步生成狀態(tài)快照(核心異步邏輯)

(1) 異步快照的核心設計

異步快照的關鍵是“主線程快照 + 后臺持久化”:

  • 主線程:快速生成狀態(tài)的“輕量級快照”(如RocksDB的快照句柄、內存狀態(tài)的序列化字節(jié)數(shù)組),不等待持久化完成。
  • 后臺線程:將輕量級快照持久化到遠程存儲(如HDFS),持久化完成后通知主線程。

(2) 核心流程與源碼

checkpointStateManager.triggerCheckpoint()最終會調用每個算子的snapshotState()方法,而算子的狀態(tài)快照由StateBackend完成。以RocksDBStateBackend為例,其異步快照邏輯如下:

// org.apache.flink.contrib.streaming.state.RocksDBStateBackend
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions)throws Exception {

    // 1. 主線程:生成RocksDB快照(輕量級操作)
    RocksDBSnapshotsnapshot= db.getSnapshot();
    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = metaInfoSnapshot();

    // 2. 創(chuàng)建異步任務:將快照持久化到遠程存儲
    returnnewFutureTask<>(
        () -> {
            try (CheckpointStreamFactory.CheckpointStateOutputStreamout= streamFactory.createCheckpointStateOutputStream()) {
                // 2.1 將RocksDB快照數(shù)據(jù)寫入輸出流(同步操作,但已在后臺線程執(zhí)行)
                snapshot.writeTo(out);
                stateMetaInfoSnapshots.writeTo(out);

                // 2.2 獲取持久化后的狀態(tài)句柄(包含遠程存儲路徑、文件大小等)
                StreamStateHandlestateHandle= out.closeAndGetHandle();
                return SnapshotResult.of(stateHandle);
            } catch (Exception e) {
                // 持久化失敗,返回失敗結果
                return SnapshotResult.of(e);
            } finally {
                // 釋放RocksDB快照資源
                db.releaseSnapshot(snapshot);
            }
        });
}

關鍵點解析:

① 主線程操作:db.getSnapshot()生成RocksDB的快照句柄(僅記錄當前數(shù)據(jù)文件的指針,不復制數(shù)據(jù)),metaInfoSnapshot()獲取狀態(tài)元信息(如列族名稱、序列化器等)。這兩步是輕量級的,耗時極短(毫秒級)。

② 異步任務封裝:通過FutureTask將持久化邏輯封裝為異步任務,F(xiàn)utureTask實現(xiàn)了RunnableFuture接口,可提交到線程池執(zhí)行。

③ 后臺持久化:FutureTask的run()方法在后臺線程執(zhí)行,核心邏輯包括:

  • 創(chuàng)建CheckpointStateOutputStream(連接遠程存儲的輸出流)。
  • 將RocksDB快照數(shù)據(jù)(通過snapshot.writeTo())和元信息寫入輸出流。
  • 調用out.closeAndGetHandle()獲取遠程存儲的句柄(如HDFS文件路徑)。
  • 釋放RocksDB快照資源(避免內存泄漏)。

④ 結果返回:FutureTask的get()方法可獲取持久化結果(成功返回SnapshotResult,包含狀態(tài)句柄;失敗返回異常)。但主線程不會立即調用get(),而是將FutureTask提交到線程池后繼續(xù)執(zhí)行其他任務。

(3) 異步任務的執(zhí)行線程池

Flink使用ExecutorService執(zhí)行異步持久化任務,線程池配置如下:

  • 線程池類型:ForkJoinPool(默認)或ThreadPoolExecutor,可通過taskmanager.network.netty.io.numThreads配置線程數(shù)(默認為CPU核心數(shù))。
  • 任務提交:StreamTask在生成異步快照后,將FutureTask提交到線程池:
// org.apache.flink.streaming.runtime.tasks.StreamTask
privatevoidtriggerCheckpointOnExecutor(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics) {
    // 生成異步快照(FutureTask)
    RunnableFuture<SnapshotResult<?>> snapshotFuture = operatorChain.snapshotState(checkpointMetaData, checkpointOptions, checkpointMetrics);
    
    // 提交到異步線程池執(zhí)行
    asyncOperationsThreadPool.submit(() -> {
        try {
            // 等待持久化完成(后臺線程執(zhí)行)
            SnapshotResult<?> snapshotResult = snapshotFuture.get();
            // 持久化成功,向JobManager發(fā)送Ack
            acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), snapshotResult);
        } catch (Exception e) {
            // 持久化失敗,向JobManager發(fā)送Nack
            declineCheckpoint(checkpointMetaData.getCheckpointId(), e);
        }
    });
}

4. 階段4:Task向JobManager確認Checkpoint結果

(1) 確認時機

Task的異步持久化任務完成后(成功或失敗),需向JobManager發(fā)送確認消息:

  • Ack:持久化成功,攜帶狀態(tài)句柄(SnapshotResult)。
  • Nack:持久化失敗,攜帶異常信息。

(2) 核心流程與源碼

確認邏輯由TaskExecutorGateway的acknowledgeCheckpoint()方法實現(xiàn),核心如下:

// org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
publicvoidacknowledgeCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        SnapshotResult<?> snapshotResult) {

    // 1. 根據(jù)ExecutionAttemptID找到對應的Task
    Tasktask= taskSlotTable.getTask(executionAttemptID);
    if (task != null) {
        // 2. 通知Task Checkpoint完成
        task.acknowledgeCheckpoint(checkpointId, checkpointMetrics, snapshotResult);
    }
}

// org.apache.flink.streaming.runtime.tasks.StreamTask
publicvoidacknowledgeCheckpoint(
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        SnapshotResult<?> snapshotResult) {

    // 3. 構建Ack消息(包含狀態(tài)句柄)
    AcknowledgeCheckpointmessage=newAcknowledgeCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        snapshotResult.getStateHandles());

    // 4. 向JobManager發(fā)送Ack消息
    jobManagerGateway.acknowledgeCheckpoint(message);
}

關鍵點解析:

  • 狀態(tài)句柄傳遞:SnapshotResult.getStateHandles()包含狀態(tài)快照的遠程存儲句柄(如StreamStateHandle,包含HDFS文件路徑、文件大小等),JobManager通過句柄定位快照數(shù)據(jù)。
  • 異步發(fā)送:通過jobManagerGateway異步發(fā)送Ack消息,避免阻塞Task主線程。

5. 階段5:JobManager匯總確認并完成Checkpoint

(1) 匯總邏輯

JobManager的CheckpointCoordinator接收所有Task的Ack消息后,需檢查:

  • 完整性:所有參與Checkpoint的Task均已Ack。
  • 一致性:所有Task的狀態(tài)句柄均有效(無異常)。

若滿足條件,則標記Checkpoint為“成功”,并將PendingCheckpoint轉為CompletedCheckpoint;否則標記為“失敗”。

(2) 核心流程與源碼

CheckpointCoordinator.receiveAcknowledgeMessage()是處理Ack消息的入口,核心邏輯如下:

// org.apache.flink.runtime.checkpoint.CheckpointCoordinator
publicvoidreceiveAcknowledgeMessage(AcknowledgeCheckpoint message)throws Exception {
    longcheckpointId= message.getCheckpointId();
    PendingCheckpointpendingCheckpoint= pendingCheckpoints.get(checkpointId);

    if (pendingCheckpoint != null) {
        // 1. 記錄Task的Ack結果(包含狀態(tài)句柄)
        pendingCheckpoint.acknowledgeTask(
            message.getTaskExecutionId(),
            message.getStateHandles(),
            message.getCheckpointMetrics());

        // 2. 檢查是否所有Task均已Ack
        if (pendingCheckpoint.isFullyAcknowledged()) {
            // 3. 將PendingCheckpoint轉為CompletedCheckpoint
            CompletedCheckpointcompletedCheckpoint= pendingCheckpoint.toCompletedCheckpoint();

            // 4. 將CompletedCheckpoint加入已完成的Checkpoint隊列
            completedCheckpoints.add(completedCheckpoint);

            // 5. 清理PendingCheckpoint
            pendingCheckpoints.remove(checkpointId);

            // 6. 通知所有監(jiān)聽器(如RestEndpoint)Checkpoint完成
            notifyCheckpointComplete(checkpointId);
        }
    }
}

關鍵點解析:

  • PendingCheckpoint.acknowledgeTask():將Task的狀態(tài)句柄存儲到PendingCheckpoint中,并更新已Ack的Task數(shù)量。
  • isFullyAcknowledged():檢查所有參與Checkpoint的Task均已Ack(通過比較已Ack數(shù)量與總Task數(shù)量)。
  • CompletedCheckpoint:存儲已完成的Checkpoint元數(shù)據(jù),包括狀態(tài)句柄、完成時間、持久化路徑等,用于故障恢復。
  • 通知監(jiān)聽器:通過notifyCheckpointComplete()通知RestEndpoint、Web UI等組件Checkpoint完成,更新作業(yè)狀態(tài)。

6. 階段6:Checkpoint完成后的清理與恢復準備

(1) 清理邏輯

Checkpoint完成后,需清理以下資源:

  • 舊Checkpoint:根據(jù)execution.checkpointing.max-retained-checkpoints配置,保留最新的N個Checkpoint,刪除舊的Checkpoint(釋放遠程存儲空間)。
  • 臨時資源:Task在快照過程中生成的臨時文件(如RocksDB的臨時快照文件)。

(2) 恢復準備

CompletedCheckpoint被存儲到CompletedCheckpointStore(默認為DefaultCompletedCheckpointStore,基于內存或ZooKeeper存儲),故障恢復時,CheckpointCoordinator從CompletedCheckpointStore中獲取最新的CompletedCheckpoint,通過狀態(tài)句柄加載狀態(tài)數(shù)據(jù),重啟Task。

五、異步Checkpoint的關鍵問題與優(yōu)化

1. Barrier對齊延遲與非對齊Checkpoint

(1) Barrier對齊的問題

在異步Checkpoint中,Barrier對齊是導致延遲的主要原因:下游算子需等待所有輸入流的Barrier到達,若某個輸入流的數(shù)據(jù)處理較慢,會導致其他輸入流的數(shù)據(jù)緩沖在內存中,無法處理,從而增加端到端延遲。

(2) 非對齊Checkpoint(Unaligned Checkpoint)

為解決Barrier對齊延遲,F(xiàn)link 1.11引入了非對齊Checkpoint:

核心思想:不等所有輸入流的Barrier到達,立即觸發(fā)狀態(tài)快照,并將緩沖區(qū)中的數(shù)據(jù)(包括Barrier之前和之后的數(shù)據(jù))作為快照的一部分。

實現(xiàn)原理:

  • 算子收到第一個Barrier時,立即停止處理該輸入流的數(shù)據(jù),并將緩沖區(qū)中的數(shù)據(jù)(包括未處理的Barrier)寫入快照。
  • 其他輸入流繼續(xù)處理數(shù)據(jù),直到Barrier到達,重復上述過程。
  • 快照完成后,算子繼續(xù)處理緩沖區(qū)中的數(shù)據(jù)。

適用場景:適用于“背壓”嚴重的作業(yè)(如數(shù)據(jù)傾斜、下游處理慢),可顯著降低Checkpoint延遲。

(3) 源碼實現(xiàn)

非對齊Checkpoint的開關由execution.checkpointing.unaligned.enabled控制,核心邏輯在CheckpointBarrierHandler的processBarrier()方法:

// org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
publicvoidprocessBarrier(CheckpointBarrier barrier)throws Exception {
    if (checkpointOptions.isUnalignedCheckpointEnabled()) {
        // 非對齊Checkpoint:立即處理Barrier,不等待對齊
        processBarrierUnaligned(barrier);
    } else {
        // 對齊Checkpoint:等待所有輸入流的Barrier到達
        processBarrierAligned(barrier);
    }
}

privatevoidprocessBarrierUnaligned(CheckpointBarrier barrier)throws Exception {
    // 1. 停止當前輸入流的數(shù)據(jù)處理
    blockCurrentInput();

    // 2. 將緩沖區(qū)中的數(shù)據(jù)(包括Barrier)寫入快照
    Buffer[] bufferedData = getBufferedData();
    for (Buffer buffer : bufferedData) {
        checkpointStorage.writeBuffer(buffer);
    }

    // 3. 觸發(fā)狀態(tài)快照
    triggerCheckpoint(barrier);

    // 4. 恢復當前輸入流的數(shù)據(jù)處理
    unblockCurrentInput();
}

2. 異步任務失敗處理

(1) 失敗場景

異步持久化任務可能因以下原因失敗:

  • 遠程存儲不可用(如HDFS宕機)。
  • 網(wǎng)絡中斷(無法上傳快照數(shù)據(jù))。
  • 本地磁盤故障(無法讀取RocksDB快照)。

(2) 處理機制

Task級失敗:若某個Task的異步持久化失敗,Task會向JobManager發(fā)送DeclineCheckpoint消息(Nack),攜帶異常信息。

作業(yè)級失敗:JobManager收到Nack后,立即標記當前Checkpoint為“失敗”,并:

  • 丟棄所有Task的本次快照數(shù)據(jù)(避免狀態(tài)不一致)。
  • 若失敗次數(shù)超過閾值(execution.checkpointing.tolerable-failed-checkpoints),觸發(fā)作業(yè)失敗(Failover)。

恢復策略:作業(yè)失敗后,CheckpointCoordinator從最新的CompletedCheckpoint恢復狀態(tài),重啟Task。

(3) 源碼實現(xiàn)

Task的異步持久化失敗處理邏輯如下:

// org.apache.flink.streaming.runtime.tasks.StreamTask
privatevoidtriggerCheckpointOnExecutor(...) {
    asyncOperationsThreadPool.submit(() -> {
        try {
            SnapshotResult<?> snapshotResult = snapshotFuture.get();
            acknowledgeCheckpoint(checkpointId, snapshotResult);
        } catch (Exception e) {
            // 持久化失敗,發(fā)送Nack
            declineCheckpoint(checkpointId, e);
        }
    });
}

privatevoiddeclineCheckpoint(long checkpointId, Throwable cause) {
    DeclineCheckpointmessage=newDeclineCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        cause);
    jobManagerGateway.declineCheckpoint(message);
}

3. StateBackend選擇對異步性能的影響

(1) StateBackend對比

StateBackend

狀態(tài)存儲位置

異步支持

適用場景

MemoryStateBackend

TaskManager內存

不支持

測試、小狀態(tài)作業(yè)

FsStateBackend

本地文件系統(tǒng)+遠程存儲

支持

中等狀態(tài)作業(yè)(GB級)

RocksDBStateBackend

本地RocksDB+遠程存儲

支持

大狀態(tài)作業(yè)(TB級)、生產環(huán)境

(2) RocksDBStateBackend的異步優(yōu)化

RocksDBStateBackend是生產環(huán)境最常用的StateBackend,其異步優(yōu)化點包括:

  • 增量Checkpoint:僅上傳上次Checkpoint后變化的數(shù)據(jù)(通過RocksDB的SST文件差異),減少持久化數(shù)據(jù)量。
  • 本地恢復:優(yōu)先從本地磁盤加載快照(若本地未刪除),避免遠程存儲讀取延遲。
  • 快照壓縮:對快照數(shù)據(jù)進行壓縮(如Snappy),減少網(wǎng)絡傳輸和存儲開銷。

4. 線程池配置優(yōu)化

異步持久化任務的性能依賴線程池配置,關鍵參數(shù)如下:

線程數(shù):taskmanager.network.netty.io.numThreads(默認為CPU核心數(shù)),需根據(jù)作業(yè)特點調整:

  • 若狀態(tài)大、持久化耗時長,可增加線程數(shù)(如CPU核心數(shù)×2)。
  • 若狀態(tài)小、持久化快,保持默認值即可。

隊列容量:taskmanager.network.netty.io.queueCapacity(默認為Integer.MAX_VALUE),避免任務被拒絕。

拒絕策略:默認為AbortPolicy(拋出異常),可改為CallerRunsPolicy(由提交線程執(zhí)行任務),避免任務丟失。

六、總結:異步Checkpoint的價值與未來方向

1. 核心價值

異步Checkpoint是Flink實現(xiàn)“高吞吐、低延遲、Exactly-Once”容錯的核心機制,其價值體現(xiàn)在:

  • 性能提升:主數(shù)據(jù)處理線程幾乎不被阻塞,Checkpoint對作業(yè)延遲和吞吐的影響降至最低。
  • 可靠性保證:通過異步持久化到遠程存儲,確保狀態(tài)快照的可靠性,故障時可快速恢復。
  • 靈活性:支持對齊/非對齊Checkpoint、增量Checkpoint等特性,適應不同作業(yè)場景。

2. 結語

異步Checkpoint機制通過“主線程快照 + 后臺持久化”的設計,巧妙地平衡了容錯與性能的關系。深入理解其原理與源碼,不僅有助于優(yōu)化Flink作業(yè)的性能,更能為分布式系統(tǒng)的容錯設計提供借鑒。隨著Flink的持續(xù)發(fā)展,異步Checkpoint將進一步演進,為實時流處理提供更強大的支撐。

責任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關推薦

2022-01-14 07:56:38

Checkpoint機制Flink

2021-09-06 18:55:57

MySQLCheckpoint機制

2025-05-26 09:05:00

2025-07-08 08:57:29

2024-02-27 08:05:32

Flink分區(qū)機制數(shù)據(jù)傳輸

2010-09-29 13:52:33

PostgreSQL

2025-10-31 07:25:00

2025-04-27 08:15:00

FlinkSavepointCheckpoint

2023-03-22 18:34:30

Flink調度部署

2016-09-07 20:43:36

Javascript異步編程

2009-07-08 15:01:00

Servlet Ses

2025-08-22 14:05:00

RSTP網(wǎng)絡端口

2024-07-16 08:38:06

2021-06-06 16:56:49

異步編程Completable

2021-11-02 06:58:55

FlinkWindow機制

2015-10-26 09:25:42

2024-04-09 07:50:59

Flink語義Watermark

2025-08-27 06:00:00

2009-09-23 16:30:01

Hibernate f

2011-03-17 09:20:05

異常處理機制
點贊
收藏

51CTO技術棧公眾號

日韩高清不卡一区| 中文字幕久久精品一区二区| 国产女同性恋一区二区| 国产综合色香蕉精品| 久久久精品视频免费观看| 日韩中文字幕一区二区高清99| 国产精品区一区二区三| 亚洲一区精品电影| 好看的av在线| 91精品观看| 日韩精品久久久久久久玫瑰园 | 丝袜视频国产在线播放| 免费的国产精品| 欧美高清视频一区二区| 中文字幕网站在线观看| 精品午夜视频| 色久综合一二码| 特级西西444| 风间由美一区| 99re8在线精品视频免费播放| 国产精品成人v| 国产一级片免费观看| 成人6969www免费视频| 亚洲精品一区二区精华| 一区二区三区四区毛片| 欧美7777| 午夜久久久久久久久| 中文字幕av日韩精品| 男人天堂综合| av电影天堂一区二区在线| 成人亲热视频网站| 成人午夜精品视频| 亚洲深夜av| 久久久久久91香蕉国产| 久久久久亚洲av片无码| 国产精品最新| 日韩精品在线第一页| 国产人妻精品午夜福利免费| 视频91a欧美| 欧美影院一区二区三区| 国产精品宾馆在线精品酒店| 男人天堂亚洲| 亚洲美女屁股眼交| 正义之心1992免费观看全集完整版| 午夜国产在线视频| 欧美激情不卡| 欧美性xxxx18| 日韩精品 欧美| av2020不卡| 亚洲一区二区三区爽爽爽爽爽| 三区精品视频观看| 国产永久免费高清在线观看 | 国产一区二区色| 在线免费一区二区| 日本不卡一二三区黄网| 国产97免费视| 夜夜爽妓女8888视频免费观看| 亚洲看片一区| 欧洲精品在线视频| www五月天com| 日本亚洲三级在线| 国产在线观看精品| 国产老妇伦国产熟女老妇视频| 免费久久99精品国产| 国产精品视频在线播放| 中文字幕永久免费视频| 捆绑紧缚一区二区三区视频| 国产精品一区二区三区久久| 91无套直看片红桃| 国产一区二区三区久久悠悠色av| 川上优av一区二区线观看| 国产农村老头老太视频| 国产成人免费在线视频| 国产欧美精品一区二区三区| 视频午夜在线| 日本一区二区成人在线| 日韩国产精品毛片| av女在线播放| 欧美自拍偷拍午夜视频| 欧美日韩精品区别| 一区二区三区四区高清视频| 亚洲成色777777女色窝| 亚洲av片不卡无码久久| 日韩成人免费| 欧美日韩电影在线观看| 午夜婷婷在线观看| 久久99精品国产.久久久久| 91精品国产99久久久久久红楼| 国产黄色片在线免费观看| 欧美日本一区二区视频在线观看| 欧美第一黄色网| 毛片视频网站在线观看| 捆绑紧缚一区二区三区视频| 成人午夜电影免费在线观看| 日本视频在线观看一区二区三区 | 国产精品久久久久久久免费大片 | 粉嫩av一区二区三区在线播放| 国产精品一区二区欧美黑人喷潮水| 日本毛片在线观看| 欧美国产精品v| 福利视频免费在线观看| 日韩欧美看国产| 欧美一级片在线观看| 亚洲综合自拍网| 国产精品麻豆久久| 97超碰色婷婷| 国产精品一区二区av白丝下载| 激情六月婷婷久久| 欧美成人dvd在线视频| 成年视频在线观看| 日本一二三四高清不卡| 韩日视频在线观看| 粉嫩91精品久久久久久久99蜜桃| 欧美一级黄色大片| av网在线播放| 日韩一级在线| 亚洲精品欧美日韩| 都市激情在线视频| 天天综合天天综合色| 国产高清999| 精品欧美激情在线观看| 国内久久久精品| 国产精品无码在线播放| 国产欧美一区在线| 国产免费观看高清视频| 国产一区二区三区免费观看在线| 国产视频综合在线| 玖玖爱免费视频| 久88久久88久久久| 亚洲国产欧美不卡在线观看| videos性欧美另类高清| 精品国产一区二区亚洲人成毛片| 东方伊人免费在线观看| 国产精品久久久免费| 99re国产| 怡红院在线观看| 欧美裸体bbwbbwbbw| 免费一级黄色录像| 视频一区视频二区中文字幕| 国产成人精品免高潮费视频| 免费观看毛片网站| 一区二区三区四区在线免费观看| 日本a√在线观看| 免费毛片在线不卡| 欧美中在线观看| 日韩精品视频在线观看一区二区三区| 亚洲欧洲日韩一区二区三区| 色哟哟精品视频| 欧美最新另类人妖| 国产精品激情av电影在线观看| 性感美女视频一二三| 亚洲v中文字幕| 中文字幕三级电影| 亚洲精品1区2区| 国产一区福利视频| 美女高潮在线观看| 亚洲免费av网址| 中文字幕在线天堂| 国产精品区一区二区三区| 亚洲欧洲日本精品| 91精品综合久久久久久久久久久| 国产精品日韩av| 麻豆影视在线观看_| 69久久99精品久久久久婷婷 | 亚洲视频一二三区| 在线观看av免费观看| 欧美久久一级| 国产另类自拍| 神马电影网我不卡| 中文字幕日本欧美| 国产精品一区二区免费视频| 亚洲激情图片qvod| 国产精品福利导航| 日韩影院免费视频| 四虎永久免费网站| 看全色黄大色大片免费久久久| 97超碰色婷婷| 在线观看免费网站黄| 51久久夜色精品国产麻豆| 麻豆成人在线视频| 91麻豆高清视频| 午夜啪啪小视频| 亚洲久久视频| 亚洲午夜精品一区二区| 日韩在线精品强乱中文字幕| 66m—66摸成人免费视频| 国产69久久| 日韩精品专区在线影院观看| 天天干天天干天天干天天| 国产精品免费观看视频| 成年人小视频在线观看| 日韩高清中文字幕一区| 日韩中文在线字幕| 亚洲第一论坛sis| 成人福利网站在线观看11| av不卡高清| 久久精品国产成人| 香蕉视频国产在线| 51久久夜色精品国产麻豆| 久久亚洲天堂网| 一区二区三区高清| 日本精品久久久久中文| 懂色中文一区二区在线播放| 黄色aaa级片| 在线精品在线| 亚洲综合自拍一区| 欧美大片免费观看网址| 欧美理论电影在线观看| yjizz视频网站在线播放| 精品欧美一区二区在线观看| 中文在线观看免费高清| 五月天亚洲精品| www.av免费| 中文字幕成人av| 国产chinese中国hdxxxx| 午夜久久tv| 亚洲第一在线综合在线| 色婷婷综合久久久久久| 亚洲xxxxx电影| 国产a亚洲精品| 欧美有码在线观看| av在线小说| 久久99精品国产99久久6尤物| 国产一区电影| 亚洲精品国产拍免费91在线| 国产成人免费看一级大黄| 欧美偷拍一区二区| 99超碰在线观看| 欧美视频一二三| 亚洲第一精品在线观看| 亚洲一区二区三区激情| 四虎免费在线视频| 中文字幕日韩欧美一区二区三区| 国产高清自拍视频| av中文一区二区三区| 日本wwwwwww| 国产一区二区女| 免费黄频在线观看| 麻豆freexxxx性91精品| wwwwxxxx日韩| 蜜桃av一区二区三区电影| 成人在线看视频| 老牛影视一区二区三区| 日本久久久精品视频| 中文欧美日韩| 日韩av黄色网址| 午夜在线观看免费一区| 欧美一区二区三区爽大粗免费| 雨宫琴音一区二区在线| 欧洲精品在线播放| 在线观看一区| 国内自拍在线观看| 久久免费高清| 亚洲天堂网一区| 精品一区二区三区蜜桃| 黄色片免费网址| 国产精品69久久久久水密桃| 熟妇女人妻丰满少妇中文字幕 | 能看毛片的网站| 懂色av一区二区夜夜嗨| 麻豆精品国产传媒av| 99久久免费精品| 无码国产69精品久久久久同性| bt欧美亚洲午夜电影天堂| 黄色片视频免费观看| 久久精品水蜜桃av综合天堂| 国产午夜福利一区| 综合久久给合久久狠狠狠97色| 天海翼在线视频| 一区二区三区在线免费| 色婷婷av国产精品| 欧美亚洲国产一区在线观看网站 | av日韩国产| 日韩女在线观看| 日韩色性视频| 国产精品国产一区二区| 青青操综合网| 亚洲图片欧洲图片日韩av| 欧美二区视频| 男人操女人免费软件| 六月丁香婷婷色狠狠久久| 国产一精品一aⅴ一免费| 久久久99精品免费观看不卡| 99久久精品久久亚洲精品| 亚洲国产成人av好男人在线观看| 毛片视频网站在线观看| 在线成人小视频| 污污视频在线免费看| 最近更新的2019中文字幕| av在线网页| 国产情人节一区| 国内精品免费| 制服国产精品| 国产精品日韩| 下面一进一出好爽视频| 久久免费视频一区| 欧美黄色高清视频| 亚洲午夜精品网| 亚洲一级特黄毛片| 亚洲精品乱码久久久久久金桔影视 | 亚洲97在线观看| 成人在线高清| 蜜桃视频日韩| 欧美精品激情| 一本色道久久亚洲综合精品蜜桃 | 国产精品久久观看| 无码专区aaaaaa免费视频| 毛片基地黄久久久久久天堂| 少妇被狂c下部羞羞漫画| 亚洲欧洲av色图| 波多野结衣午夜| 日韩高清a**址| 污污片在线免费视频| 国产精品视频1区| 一区二区三区韩国免费中文网站| 成人短视频在线看| 视频精品一区二区| 日本三级日本三级日本三级极| 国产视频一区在线观看| 亚洲一区欧美在线| 日韩欧美一区在线| av激情在线| 91网在线免费观看| 久久一级电影| 手机看片福利日韩| 91女人视频在线观看| 亚洲国产综合久久| 日韩精品中文字幕一区| 国产高清一区二区三区视频| 国产精品香蕉国产| 欧美丝袜激情| 992kp快乐看片永久免费网址| 91网站黄www| 日本午夜精品理论片a级app发布| 欧美一区二区私人影院日本| 午夜不卡视频| 国产主播精品在线| 97精品一区| 色啦啦av综合| 中文字幕亚洲成人| 国产一区二区波多野结衣| 中文字幕亚洲欧美一区二区三区 | 色婷婷久久久亚洲一区二区三区| 久久婷婷国产麻豆91| 日韩一区二区在线看片| 国产人成网在线播放va免费| 国产欧美日韩亚洲精品| 99久久.com| 国产精品igao网网址不卡| 亚洲人成网站色在线观看| 国产亲伦免费视频播放| 麻豆成人在线看| 亚洲综合色婷婷在线观看| 久久这里只有精品18| 成人精品电影在线观看| 久久夜靖品2区| 亚洲欧美日韩在线一区| 色尼玛亚洲综合影院| 亚洲国产另类久久久精品极度| 蜜臀精品一区二区三区在线观看 | 中文有码一区| 亚洲黄色a v| 国产精品久久久久久久久晋中 | 欧美日韩精品免费观看视欧美高清免费大片 | 久久天堂精品| 亚洲码无人客一区二区三区| 欧美三级免费观看| 国家队第一季免费高清在线观看| 日本高清+成人网在线观看| 欧美美女在线观看| 四季av一区二区三区| 亚洲制服丝袜在线| 欧美日韩在线精品一区二区三区激情综 | 在线亚洲人成电影网站色www| 亚洲爱情岛论坛永久| 午夜精品久久久久久99热软件| 群体交乱之放荡娇妻一区二区| 日本在线观看a| 中文字幕日韩av资源站| 国内爆初菊对白视频| 国产成人精品视| 亚洲最新色图| 爱爱免费小视频| 一区二区免费看| 亚洲人成色777777精品音频| 国产精品久久久久久久av大片| 久久国产电影| av av在线| 欧美日韩亚洲国产综合| а_天堂中文在线| 日本在线高清视频一区| 国产福利一区二区三区视频| 在线观看日本网站| 久久久国产精品视频| 日韩人体视频| 天堂网成人在线| 日本乱人伦aⅴ精品| 美女航空一级毛片在线播放| 亚洲美女搞黄| 97久久人人超碰|