精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

美團面試:對比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常見問題?

開發 架構
RocketMQ通過橫向擴展(增加消費者實例、隊列數量)、提升消費能力(線程池調優、批量消費)、動態擴容、消息預取、死信隊列隔離無效消息,并支持消費限流及監控告警,快速定位處理積壓問題。

三大MQ指標對比

分布式、微服務、高并發架構中,消息隊列(Message Queue,簡稱MQ)扮演著至關重要的角色。

消息隊列用于實現系統間的異步通信、解耦、削峰填谷等功能。

對比指標

RabbitMQ

RocketMQ

Kafka

應用場景

中小規模應用場景

分布式事務、實時日志處理

大規模數據處理、實時流處理

開發語言

Erlang

Java

Scala & Java

消息可靠性

最高 (AMQP協議保證)

較高 (基于事務保證)

中等 (基于副本機制保證)

消息吞吐量

低 萬級到十萬級

中等 十萬級到百萬級

高 百萬級或更高

時效性

毫秒級

毫秒級

毫秒級

支持的語言和平臺

Java、C++、Python等

Java、C++、Go等

Java、Scala、Python等

架構模型

virtual host、broker、exchange、queue

nameserver、controller、broker

broker、topic、partition、zookeeper/Kraft

社區活躍度和生態建設

中等 活躍的開源社區和豐富的插件生態系統

較高 阿里巴巴開源,穩定的社區支持

最高 活躍的開源社區和廣泛的應用

github star

10.8k

19.4k

25.2k

對比分析三大MQ常見問題

下面, 對比分析三大MQ常見問題。

消息丟失問題


image-20250508192254071image-20250508192254071

1、RocketMQ解決消息丟失問題

生產端:  采用同步發送(等待Broker確認)并啟用重試機制,結合事務消息(如預提交half消息+二次確認commit)確保消息可靠投遞。

Broker端:配置同步刷盤(消息寫入磁盤后返回確認)和多副本同步機制(主從節點數據冗余)防止宕機丟失,同時通過集群容災保障高可用。

消費端:消費者需手動ACK確認,失敗時觸發自動重試(默認16次),最終失敗消息轉入死信隊列人工處理,避免異常場景下消息丟失。

2、Kafka解決消息丟失問題

生產端:設置acks=all確保消息被所有副本持久化后才響應,啟用生產者重試(retries)及冪等性(enable.idempotence=true)防止網絡抖動或Broker異常導致丟失

Broker端:配置多副本同步(min.insync.replicas≥2)和ISR(In-Sync Replicas)機制,僅同步成功的副本參與選舉;避免unclean.leader.election.enable=true(防止數據不全的副本成為Leader)

消費端:關閉自動提交位移(enable.auto.commit=false),手動同步提交(commitSync)確保消息處理完成后再更新位移,結合消費重試及死信隊列兜底

3、RabbitMQ解決消息丟失問題

生產端:啟用Publisher Confirm模式(異步確認消息持久化)并設置mandatory=true路由失敗回退,結合備份交換機處理無法路由的消息;事務消息因性能損耗僅限關鍵場景使用。

Broker端:消息與隊列均需持久化(durable=true)防止宕機丟失,部署鏡像隊列集群實現多節點冗余;同步刷盤策略確保數據落盤后響應。

消費端:關閉自動ACK,采用手動ACK并在業務處理成功后提交確認;消費失敗時重試(重試次數可配置)并最終轉入死信隊列人工干預,避免消息因異常未處理而丟失。

消息積壓問題

1、RocketMQ解決消息積壓問題

RocketMQ通過橫向擴展(增加消費者實例、隊列數量)、提升消費能力(線程池調優、批量消費)、動態擴容、消息預取、死信隊列隔離無效消息,并支持消費限流及監控告警,快速定位處理積壓問題。

RocketMQ還提供了消息拉取和推拉模式,消費者可以根據自身的處理能力主動拉取消息,避免消息積壓過多。

2、Kafka解決消息積壓問題

Kafka通過  橫向擴展(增加分區及消費者實例)、優化消費者參數(如批量拉取、并發處理)、提升消費邏輯效率(異步化、減少I/O),并動態監控消費滯后指標。

必要時限流生產者或臨時擴容消費組,結合分區再平衡策略快速分發積壓消息負載。

Kafka還提供了消息清理(compaction)和數據保留策略,可以根據時間或者數據大小來自動刪除過期的消息,避免消息積壓過多。

3、RabbitMQ解決消息積壓問題

RabbitMQ通過調整消費者的消費速率來控制消息積壓。

可以使用QoS(Quality of Service)機制設置每個消費者的預取計數,限制每次從隊列中獲取的消息數量,以控制消費者的處理速度。

RabbitMQ還支持消費者端的流量控制,通過設置basic.qos或basic.consume命令的參數來控制消費者的處理速度,避免消息過多導致積壓。

消息重復消費問題

1、RocketMQ解決消息重復消費問題

  • 使用消息唯一標識符(Message ID):在消息發送時,為每條消息附加一個唯一標識符。消費者在處理消息時,可以通過判斷消息唯一標識符來避免重復消費??梢詫⑾D記錄在數據庫或緩存中,用于去重檢查。
  • 消費者端去重處理:消費者在消費消息時,可以通過維護一個已消費消息的列表或緩存,來避免重復消費已經處理過的消息。

2、Kafka解決消息重復消費問題

  • 冪等性處理:在消費者端實現冪等性邏輯,即多次消費同一條消息所產生的結果與單次消費的結果一致。這可以通過在業務邏輯中引入唯一標識符或記錄已處理消息的狀態來實現。
  • 消息確認機制:消費者在處理完消息后,提交已消費的偏移量(Offset)給Kafka,Kafka會記錄已提交的偏移量,以便在消費者重新啟動時從正確的位置繼續消費。消費者可以定期提交偏移量,確保消息只被消費一次。

3、RabbitMQ解決消息重復消費問題

  • 冪等性處理:在消費者端實現冪等性邏輯,即無論消息被消費多少次,最終的結果應該保持一致。這可以通過在消費端進行唯一標識的檢查或者記錄已經處理過的消息來實現。
  • 消息確認機制:消費者在處理完消息后,發送確認消息(ACK)給RabbitMQ,告知消息已經成功處理。RabbitMQ根據接收到的確認消息來判斷是否需要重新投遞消息給其他消費者。

最為詳細的方案,請參考尼恩團隊的架構方案: 最系統的冪等性方案:一鎖二判三更新

消息有序性

1、Rabbitmq 解決有序性問題

模式一:單隊列單消費者模式
  • 將需要保證順序的消息全部發送到同一個隊列,且消費者設置為單線程處理。
  • 原理:RabbitMQ 隊列天然支持 FIFO 順序存儲,單消費者避免并發處理導致亂序。

示例代:

// 生產者發送到同一隊列
  rabbitTemplate.convertAndSend("order.queue", "message1");
  rabbitTemplate.convertAndSend("order.queue", "message2");
  // 消費者單線程監聽
  @RabbitListener(queues = "order.queue")
  public void processOrder(String message) {
      // 順序處理邏輯
  }

缺點:無法橫向擴展消費者,吞吐量受限。

模式二:消息分組策略

按業務標識分區(如訂單 ID、用戶 ID),相同分組的消息路由到同一隊列,每個隊列對應一個消費者。

實現方式: 生產者通過哈希算法或自定義路由鍵將關聯的消息分配到特定隊列。

  • 生產者根據業務標識生成路由鍵,如 routingKey = orderId.hashCode() % queueCount。
  • 聲明多個隊列,綁定到同一交換機,并根據路由鍵規則分發消息。

代碼示例:

// 生產者發送消息時指定路由鍵
String orderId = "ORDER_1001";
String routingKey = "order." + (orderId.hashCode() % 3);  // 分配到3個隊列之一
rabbitTemplate.convertAndSend("order.exchange", routingKey, message);

優勢:在保證同分組順序性的同時,允許不同分組并行處理。

消費者并發控制 設置

prefetchCount=1

確保每次只處理一個消息,關閉自動應答,手動確認后再獲取新消息:

spring:
  rabbitmq:
         listener:
           simple:
             prefetch: 1

效果:防止消費者同時處理多個消息導致亂序。

2、RocketMQ解決有序性問題

RocketMQ實現順序消息的核心是通過生產端和消費端雙重保障:

  • 全局順序需單隊列(性能受限),分區順序通過Sharding Key哈希分散到不同隊列,兼顧吞吐量與局部有序性。需避免異步消費、消息重試亂序,失敗時跳過當前消息防止阻塞
  • 生產者使用MessageQueueSelector將同一業務標識(如訂單ID)的消息強制路由至同一隊列,利用隊列FIFO特性保序;
  • 消費端對  同一隊列啟用 單線程拉取 + 分區鎖機制(ConsumeOrderlyContext),確保串行處理。

3、Kafka解決有序性問題

Kafka實現順序消息的核心在于分區順序性:

  • 生產端:相同業務標識(如訂單ID)的消息通過固定Key哈希至同一分區(Partitioner),利用分區內消息天然有序性保序;
  • 消費端:每個分區僅由同一消費者組的一個線程消費(單線程串行處理),避免并發消費亂序;

事務消息

1、RabbitMQ的事務消息

  • RabbitMQ支持事務消息的發送和確認。在發送消息之前,可以通過調用"channel.txSelect()"來開啟事務,然后將要發送的消息發布到交換機中。如果事務成功提交,消息將被發送到隊列,否則事務會回滾,消息不會被發送。
  • 在消費端,可以通過"channel.txSelect()"開啟事務,然后使用"basicAck"手動確認消息的處理結果。如果事務成功提交,消費端會發送ACK確認消息的處理;否則,事務回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
    private static final String QUEUE_NAME = "transaction_queue";
    public static void main(String[] args) {
        try {
            // 創建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // 創建連接
            Connection connection = factory.newConnection();
            // 創建信道
            Channel channel = connection.createChannel();
            // 聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            try {
                // 開啟事務
                channel.txSelect();
                // 發送消息
                String message = "Hello, RabbitMQ!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                // 提交事務
                channel.txCommit();
            } catch (Exception e) {
                // 事務回滾
                channel.txRollback();
                e.printStackTrace();
            }
            // 關閉信道和連接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、RocketMQ的事務消息

RocketMQ提供了事務消息的機制,確保消息的可靠性和一致性。

發送事務消息時,需要將消息發送到半消息隊列,然后執行本地事務邏輯。

事務執行成功后,通過調用"TransactionStatus.CommitTransaction"提交事務消息;若事務執行失敗,則通過調用"TransactionStatus.RollbackTransaction"回滾事務消息。

事務消息的最終狀態由消息生產者根據事務執行結果進行確認。

public class RocketMQTransactionDemo {
    public static void main(String[] args) throws Exception {
        // 創建事務消息生產者
        TransactionMQProducer producer = new TransactionMQProducer("group_name");
        producer.setNamesrvAddr("localhost:9876");
        // 設置事務監聽器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 執行本地事務邏輯,根據業務邏輯結果返回相應的狀態
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務狀態未知
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 根據消息的狀態,來判斷本地事務的最終狀態
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務狀態未知
            }
        });
        // 啟動事務消息生產者
        producer.start();
        // 構造消息
        Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
        // 發送事務消息
        TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println("Send Result: " + sendResult);
        // 關閉事務消息生產者
        producer.shutdown();
    }
}

3、Kafka的事務消息

Kafka引入了事務功能來確保消息的原子性和一致性。事務消息的發送和確認在生產者端進行。

生產者可以通過初始化事務,將一系列的消息寫入事務,然后通過"commitTransaction()"提交事務,或者通過"abortTransaction()"中止事務。

Kafka會保證在事務提交之前,寫入的所有消息不會被消費者可見,以保持事務的一致性。

public class KafkaTransactionDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 初始化事務
        producer.initTransactions();
        try {
            // 開啟事務
            producer.beginTransaction();
            // 發送消息
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
            producer.send(record);
            // 提交事務
            producer.commitTransaction();
        } catch (ProducerFencedException e) {
            // 處理異常情況
            producer.close();
        } finally {
            producer.close();
        }
    }
}

消息確認 ACK機制

1、RabbitMQ的ACK機制

RabbitMQ使用ACK(消息確認)機制來確保消息的可靠傳遞。

消費者收到消息后,需要向RabbitMQ發送ACK來確認消息的處理狀態。

只有在收到ACK后,RabbitMQ才會將消息標記為已成功傳遞,否則會將消息重新投遞給其他消費者或者保留在隊列中。

以下是RabbitMQ ACK的Java示例:

public class RabbitMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 創建連接
        Connection connection = factory.newConnection();
        // 創建信道
        Channel channel = connection.createChannel();
        // 聲明隊列
        String queueName = "queue_name";
        channel.queueDeclare(queueName, false, false, false, null);
        // 創建消費者
        String consumerTag = "consumer_tag";
        boolean autoAck = false; // 關閉自動ACK
        // 消費消息
        channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費消息
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
                try {
                    // 模擬處理消息的業務邏輯
                    processMessage(message);
                    // 手動發送ACK確認消息
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + message);
                    e.printStackTrace();
                    // 手動發送NACK拒絕消息,并可選是否重新投遞
                    long deliveryTag = envelope.getDeliveryTag();
                    boolean requeue = true; // 重新投遞消息
                    channel.basicNack(deliveryTag, false, requeue);
                }
            }
        });
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業務邏輯
    }
}

2、RocketMQ的ACK機制

RocketMQ的ACK機制由消費者控制,消費者從消息隊列中消費消息后,可以手動發送ACK確認消息的處理狀態。

只有在收到ACK后,RocketMQ才會將消息標記為已成功消費,否則會將消息重新投遞給其他消費者。

public class RocketMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱消息
        consumer.subscribe("topic_name", "*");
        // 注冊消息監聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt message : msgs) {
                try {
                    // 消費消息
                    String msgBody = new String(message.getBody(), "UTF-8");
                    System.out.println("Received message: " + msgBody);
                    // 模擬處理消息的業務邏輯
                    processMessage(msgBody);
                    // 手動發送ACK確認消息
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + new String(message.getBody()));
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 啟動消費者
        consumer.start();
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業務邏輯
    }
}

3、Kafka的ACK機制

Kafka的ACK機制用于控制生產者在發送消息后,需要等待多少個副本確認才視為消息發送成功。

這個機制可以通過設置acks參數來進行配置。在Kafka中,acks參數有三個可選值:

acks=0:生產者在發送消息后不需要等待任何確認,直接將消息發送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數據丟失的風險,因為生產者不會知道消息是否成功發送給任何副本。

acks=1:生產者在發送消息后只需要等待首領副本(leader replica)確認。一旦首領副本成功接收到消息,生產者就會收到確認。這種方式提供了一定的可靠性,但是如果首領副本在接收消息后但在確認之前發生故障,仍然可能會導致數據丟失。

acks=all:生產者在發送消息后需要等待所有副本都確認。只有當所有副本都成功接收到消息后,生產者才會收到確認。這是最安全的確認機制,確保了消息不會丟失,但是需要更多的時間和資源。acks=-1與acks=all是等效的。

public classKafkaProducerDemo{
    public static void main(String[]args){
        // 配置Kafka生產者的參數
        Propertiesprops=newProperties();
        props.put("bootstrap.servers","localhost:9092");// Kafka集群的地址和端口
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 鍵的序列化器
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器
        props.put("acks","all");// 設置ACK機制為所有副本都確認
        // 創建生產者實例
        KafkaProducer<String,String>producer=newKafkaProducer<>(props);
        // 構造消息
        Stringtopic="my_topic";
        Stringkey="my_key";
        Stringvalue="Hello, Kafka!";
        // 創建消息記錄
        ProducerRecord<String,String>record=newProducerRecord<>(topic,key,value);
        // 發送消息
        producer.send(record,newCallback(){
            @Override
            publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
                if(exception!=null){
                    System.err.println("發送消息出現異常:"+exception.getMessage());
                }else{
                    System.out.println("消息發送成功!位于分區 "+metadata.partition()+",偏移量 "+metadata.offset());
                }
            }
        });
        // 關閉生產者
        producer.close();
    }
}

延遲消息實現

延遲隊列在實際項目中有非常多的應用場景,最常見的比如訂單未支付,超時取消訂單,在創建訂單的時候發送一條延遲消息,達到延遲時間之后消費者收到消息,如果訂單沒有支付的話,那么就取消訂單。


image-20250508192415995image-20250508192415995

1、RocketMQ實現延遲消息

RocketMQ 默認時間間隔分為 18 個級別,基本上也能滿足大部分場景的需要了。

默認延遲級別:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

使用起來也非常的簡單,直接通過setDelayTimeLevel設置延遲級別即可。

setDelayTimeLevel(level)

實現原理說起來比較簡單,Broker 會根據不同的延遲級別創建出多個不同級別的隊列,當我們發送延遲消息的時候,根據不同的延遲級別發送到不同的隊列中,同時在 Broker 內部通過一個定時器去輪詢這些隊列(RocketMQ 會為每個延遲級別分別創建一個定時任務),如果消息達到發送時間,那么就直接把消息發送到指 topic 隊列中。

RocketMQ 這種實現方式是放在服務端去做的,同時有個好處就是相同延遲時間的消息是可以保證有序性的。

談到這里就順便提一下關于消息消費重試的原理,這個本質上來說其實是一樣的,對于消費失敗需要重試的消息實際上都會被丟到延遲隊列的 topic 里,到期后再轉發到真正的 topic 中。


image-20250508192539070image-20250508192539070

2、RabbitMQ實現延遲消息

RabbitMQ本身并不存在延遲隊列的概念,在 RabbitMQ 中是通過 DLX 死信交換機和 TTL 消息過期來實現延遲隊列的。

TTL(Time to Live)過期時間

有兩種方式可以設置 TTL。

(1) 通過隊列屬性設置,這樣的話隊列中的所有消息都會擁有相同的過期時間(2) 對消息單獨設置過期時間,這樣每條消息的過期時間都可以不同

那么如果同時設置呢?這樣將會以兩個時間中較小的值為準。

針對隊列的方式通過參數x-message-ttl來設置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

針對消息的方式通過setExpiration來設置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交換機

一個消息要成為死信消息有 3 種情況:

(1) 消息被拒絕,比如調用reject方法,并且需要設置requeuefalse

(2) 消息過期

(3) 隊列達到最大長度

可以通過參數dead-letter-exchange設置死信交換機,也可以通過參數dead-letter- exchange指定 RoutingKey(未指定則使用原隊列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

實現原理

當我們對消息設置了 TTL 和 DLX 之后,當消息正常發送,通過 Exchange 到達 Queue 之后,由于設置了 TTL 過期時間,并且消息沒有被消費(訂閱的是死信隊列),達到過期時間之后,消息就轉移到與之綁定的 DLX 死信隊列之中。

這樣的話,就相當于通過 DLX 和 TTL 間接實現了延遲消息的功能,實際使用中我們可以根據不同的延遲級別綁定設置不同延遲時間的隊列來達到實現不同延遲時間的效果。

如果隊列通過 dead-letter-exchange 屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中,這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)。


image-20250508192505250image-20250508192505250

3、Kafka實現延遲消息

對于 Kafka 來說,原生并不支持延遲隊列的功能,需要我們手動去實現,這里我根據 RocketMQ 的設計提供一個實現思路。

這個設計,我們也不支持任意時間精度的延遲消息,只支持固定級別的延遲,因為對于大部分延遲消息的場景來說足夠使用了。

只創建一個 topic,但是針對該 topic 創建 18 個 partition,每個 partition 對應不同的延遲級別,這樣做和 RocketMQ 一樣有個好處就是能達到相同延遲時間的消息達到有序性。

應用級 Kafka 延遲消息實現原理

首先創建一個單獨針對延遲隊列的 topic,同時創建 18 個 partition 針對不同的延遲級別。

發送消息的時候根據延遲參數發送到延遲 topic 對應的 partition,對應的key為延遲時間,同時把原 topic 保存到 header 中。

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));

內嵌的consumer單獨設置一個ConsumerGroup去消費延遲 topic 消息,消費到消息之后如果沒有達到延遲時間那么就進行pause,然后seek到當前ConsumerRecordoffset位置,同時使用定時器去輪詢延遲的TopicPartition,達到延遲時間之后進行resume。

如果達到了延遲時間,那么就獲取到header中的真實 topic ,直接轉發。

這里為什么要進行pauseresume呢?

因為如果不這樣的話,如果超時未消費達到max.poll.interval.ms最大時間(默認300s),那么將會觸發 Rebalance。

責任編輯:武曉燕 來源: 技術自由圈
相關推薦

2009-02-16 17:21:46

2023-09-19 08:09:21

RabbitMQRocketMQKafka

2025-02-27 08:50:00

RocketMQ開發代碼

2025-05-26 02:15:00

2010-05-13 13:27:23

2017-05-05 10:15:38

深度學習框架對比分析

2012-11-19 11:30:40

PowerShell常見問題解決方法

2025-07-21 09:02:45

2011-11-23 16:28:07

JavaSpring框架

2018-04-23 09:50:54

2010-06-08 11:15:43

OpenSUSE Ub

2018-05-10 12:55:51

大數據對比分析面試

2021-08-24 07:57:26

KafkaRocketMQPulsar

2010-06-12 15:36:01

2024-08-22 14:49:49

系統設計數據庫

2024-01-09 15:37:46

2015-09-22 10:14:57

虛擬化虛擬化問題

2010-08-06 16:15:57

Flex通信

2011-04-08 13:58:52

JavaJSP

2011-05-30 15:12:46

電纜雙絞線布線
點贊
收藏

51CTO技術棧公眾號

国产裸体视频网站| 亚欧洲精品在线视频免费观看| 婷婷在线精品视频| jizz国产精品| 亚洲成a人v欧美综合天堂下载 | 丝袜老师办公室里做好紧好爽| 无码精品视频一区二区三区| 日本大胆欧美人术艺术动态 | 疯狂撞击丝袜人妻| 99ri日韩精品视频| 在线观看网站黄不卡| 四虎影院一区二区| 五月婷婷六月丁香综合| 精品在线播放午夜| 国产91精品青草社区| 国产男女猛烈无遮挡在线喷水| 国产在线播放精品| 欧美日韩高清一区二区三区| 青青青免费在线| 日本黄色片在线观看| 99久久久久久| 99一区二区三区| 亚洲自拍偷拍另类| 一区二区日韩免费看| 毛片精品免费在线观看| 国产精品国产三级国产专业不 | 丰满放荡岳乱妇91ww| 国产成人jvid在线播放| 久久精品波多野结衣| 国精一区二区| 国产视频精品久久久| 性折磨bdsm欧美激情另类| 91成人抖音| 欧美性猛交xxxx富婆弯腰| av 日韩 人妻 黑人 综合 无码| 午夜福利理论片在线观看| 国产精品亚洲一区二区三区在线| 国产精品日韩在线播放| 国产一级一级国产| 国产一区二区三区的电影| 欧美激情精品久久久久久大尺度 | 国产交换配乱淫视频免费| 婷婷视频一区二区三区| 欧美高清视频不卡网| 国内自拍视频一区| 筱崎爱全乳无删减在线观看| 亚洲成人av中文| 美女扒开大腿让男人桶| 五月天激情在线| 亚洲婷婷在线视频| 蜜桃视频在线一区| 欧美一区二区精美| 亚洲免费999| 久久久免费人体| 欧美视频在线一区二区三区| 91av在线免费播放| 日本综合字幕| 在线观看免费一区| 中文字幕在线综合| 国产精品99| 欧美日韩国产高清一区二区三区| xx欧美撒尿嘘撒尿xx| xxxxx.日韩| 欧美日韩成人综合| 国内自拍第二页| 精品伊人久久| 日韩视频在线永久播放| 91在线第一页| 咪咪网在线视频| 一本一本大道香蕉久在线精品| 无码精品a∨在线观看中文| 午夜伦理福利在线| 欧美性xxxxx极品娇小| 少妇高清精品毛片在线视频| 国模视频一区| 91精品在线麻豆| 中文字幕人妻熟女在线| 日本精品影院| 一区二区福利视频| av最新在线观看| 午夜久久福利| …久久精品99久久香蕉国产| av首页在线观看| 国产鲁鲁视频在线观看免费| 久久一本综合频道| 国产精品久久不能| av在线免费在线观看| 成人午夜激情影院| 欧美人与性禽动交精品| aaa在线观看| 亚洲伊人伊色伊影伊综合网| 欧美 日韩 国产一区| 成人免费视频观看| 91精品国产综合久久婷婷香蕉 | 妞干网在线播放| 日韩伦理精品| 欧美理论电影在线| 日韩成人av影院| 成人在线免费观看91| 欧美日韩国产成人在线观看| 69视频免费看| 国产精品一区免费视频| 你懂的视频在线一区二区| 麻豆网站在线| 欧美日韩一区二区免费视频| 在线观看av免费观看| 网友自拍一区| 欧美第一黄网免费网站| 无码视频一区二区三区| 国产成人综合视频| 色综合久久av| 擼擼色在线看观看免费| 欧美精品 日韩| 国产中年熟女高潮大集合| 最新国产精品久久久| 国产成人精品优优av| 刘亦菲毛片一区二区三区| 日本一区二区视频在线| 熟女少妇在线视频播放| 豆花视频一区| 尤物yw午夜国产精品视频| 日本五十路女优| 国产自产v一区二区三区c| 欧美日韩国产综合视频在线| 蜜乳av一区| 91精品欧美福利在线观看| 国产精品国产三级国产专业不 | 青草国产精品| 欧美在线性爱视频 | 亚洲欧美国产高清va在线播 | 日韩欧美精品网址| 在线观看免费视频国产| 亚洲欧洲美洲一区二区三区| 国产精品视频免费在线| 黄色大片在线免费观看| 欧美日韩国产丝袜另类| 在线观看免费视频国产| 亚洲精品成人| 91久久国产婷婷一区二区| av资源种子在线观看| 91成人国产精品| 欧美色图亚洲激情| 亚洲欧美视频| 欧美精品成人一区二区在线观看 | 尤物视频在线看| 欧美精品丝袜久久久中文字幕| 黄色国产在线播放| 日本不卡一区二区三区高清视频| 欧美韩国日本精品一区二区三区| 亚洲欧洲高清| 亚洲人高潮女人毛茸茸| 中文人妻av久久人妻18| 久久精品一二三| 国产视频一区二区三区在线播放 | 欧美日韩中文字幕在线播放| 精品国产亚洲一区二区三区在线| 久久视频免费观看| 99国产精品99| 夜夜嗨av一区二区三区| 美女扒开腿免费视频| 亚洲香蕉网站| 国产欧美韩日| 中文在线8资源库| 日韩精品免费在线观看| 黄色污污网站在线观看| 国产女人18毛片水真多成人如厕| 91制片厂毛片| 亚洲一区二区| 国产精品加勒比| 竹内纱里奈兽皇系列在线观看| 亚洲欧洲自拍偷拍| 在线观看视频二区| 亚洲精品乱码久久久久久久久| 深夜视频在线观看| 国产亚洲午夜| 亚洲第一综合| 日韩高清在线观看一区二区| 国模精品系列视频| 日韩精品系列| 欧美日韩国产综合视频在线观看 | 欧美日韩国产高清| 国产在线精品一区二区中文 | 日韩欧美中文免费| 夜夜春很很躁夜夜躁| 国产一区二区三区日韩| 国产精品网站免费| 欧美色爱综合| 不卡一卡2卡3卡4卡精品在| 韩日毛片在线观看| 丝袜一区二区三区| 国精品人妻无码一区二区三区喝尿 | 日韩视频精品| 久久久国产精品入口麻豆| 91av视频在线观看| 免费大片在线观看www| 亚洲级视频在线观看免费1级| 懂色av中文字幕| 亚洲精品免费在线播放| 国产三级国产精品| 国产一区欧美一区| 国产偷人视频免费| 综合视频在线| 欧美日韩国产不卡在线看| 日本在线成人| 国产精品亚发布| 国产精品一区二区日韩| 久久精品在线播放| 噜噜噜噜噜在线视频| 欧美成va人片在线观看| 中文字幕av第一页| 高潮白浆女日韩av免费看| 日本午夜精品视频| 91免费在线视频观看| 佐山爱在线视频| 人妖欧美一区二区| 色综合久久久久无码专区| 午夜日韩在线| 一区二区三区精品国产| 麻豆成人入口| 99re资源| 自拍偷拍欧美日韩| 国产精品com| 激情黄产视频在线免费观看| 伦理中文字幕亚洲| 在线观看的av| 亚洲欧洲日产国码av系列天堂 | 色婷婷久久av| 黄色的视频在线免费观看| 亚洲精品一区二区三区在线观看| 国产精品一区二区免费视频 | 亚洲精品视频专区| 欧美日韩国产成人在线免费| 五月天中文字幕| 色婷婷久久综合| 国产精品男女视频| 亚洲国产中文字幕| 国产乱国产乱老熟300| 成人免费一区二区三区视频| 欧美xxxx精品| 国产精品欧美精品| 麻豆视频免费在线播放| 国产欧美日本一区二区三区| 国产黄片一区二区三区| 久久免费午夜影院| 泷泽萝拉在线播放| 久久夜色精品国产欧美乱极品| 亚洲av无码一区二区三区观看| 成人av网址在线观看| 69亚洲乱人伦| 成人激情av网| 超碰男人的天堂| 99久久精品国产精品久久| 成人区人妻精品一区二| 波多野结衣视频一区| 理论片大全免费理伦片| av午夜精品一区二区三区| 一区二区免费在线观看视频| 播五月开心婷婷综合| 日本少妇毛茸茸| av在线不卡免费看| 欧美bbbbb性bbbbb视频| 久久精品欧美一区二区三区不卡 | 1314成人网| 国产成人精品免费看| 国产精品日日摸夜夜爽| gogo大胆日本视频一区| 深爱五月激情网| 国产日产亚洲精品系列| 91视频青青草| 亚洲福利视频一区| 二区视频在线观看| 欧美午夜精品一区二区蜜桃| 国产精品久久婷婷| 亚洲第一精品久久忘忧草社区| 在线观看xxx| 中日韩美女免费视频网址在线观看 | 亚洲 欧美 日韩系列| 精品无人区卡一卡二卡三乱码免费卡| 久久精品一卡二卡| 不卡电影一区二区三区| av黄色免费网站| 亚洲视频每日更新| 精品无码免费视频| 在线视频一区二区三| 国产喷水福利在线视频| 日韩国产精品一区| 日本在线视频站| 久久免费观看视频| 国产a亚洲精品| 国产精品久久久久久久久久久久午夜片 | 午夜在线播放视频欧美| 国产精品区在线| 不卡的看片网站| 最新av电影网站| 日韩欧美国产中文字幕| www.xxxx国产| 国产一区二区三区三区在线观看 | 天堂av在线免费观看| 色哟哟网站入口亚洲精品| 丁香高清在线观看完整电影视频 | 国内精品久久久久久久久久| 亚洲精品suv精品一区二区| 91精品国产综合久久久久久豆腐| 久久久久国产精品www| 国产精品美女午夜爽爽| 精品999在线观看| 久久久久美女| 国产精彩免费视频| 不卡视频免费播放| 日本一级二级视频| 在线观看视频一区二区| 天天av天天翘| 欧美美女操人视频| 成人国产激情在线| 欧美成ee人免费视频| 国产精品观看| 蜜桃福利午夜精品一区| 国产婷婷色一区二区三区在线| 国产亚洲小视频| 91精品国产综合久久久久久久久久| 日av在线播放| 国模视频一区二区三区| 五月亚洲婷婷| 中文字幕乱码免费| 另类小说欧美激情| av电影在线不卡| 欧美日韩亚洲激情| 婷婷开心激情网| 久久久人成影片一区二区三区| 电影一区中文字幕| 中文字幕在线观看一区二区三区| 久久人人97超碰国产公开结果| 好吊色视频一区二区三区| 一区二区三区四区亚洲| 国产精品自产拍| 日韩在线观看免费高清完整版| 午夜激情成人网| 欧美日韩在线精品一区二区三区| 亚洲色诱最新| 国产黄色三级网站| 亚洲国产综合91精品麻豆| 国产自产一区二区| 高清一区二区三区四区五区| 99香蕉久久| 亚洲精品蜜桃久久久久久| 国产精品亚洲成人| 久久久久香蕉视频| 欧美大片在线观看一区| 色呦呦在线资源| 国产精品jizz视频| 在线观看日韩av电影| 在线观看免费视频国产| 精品美女永久免费视频| 天天色综合久久| 青草青草久热精品视频在线网站| 综合伊思人在钱三区| av动漫免费看| 欧美激情资源网| 亚洲一区二区色| 久久国产视频网站| 久久黄色影视| 国产日产欧美视频| 国产精品女主播av| 国产美女www爽爽爽视频| 欧美精品在线免费观看| 亚洲精品国产九九九| aa视频在线播放| 久久久久久久久久久电影| 中文字幕一区二区三区波野结| 色老头一区二区三区| 欧洲精品99毛片免费高清观看| 男人天堂a在线| 国产亚洲一区二区在线观看| 中文字幕+乱码+中文字幕明步| 久久精品视频免费播放| 成人在线视频你懂的| 激情六月丁香婷婷| 国产精品久久久久一区| 亚洲经典一区二区| 奇米4444一区二区三区 | www.avtt| 国产亲近乱来精品视频| 国产精品日韩无码| 91av在线免费观看视频| 日韩在线视屏| 亚洲激情 欧美| 欧美三片在线视频观看| 美足av综合网| 日韩欧美亚洲日产国产| 国产九色精品成人porny| 五月婷婷中文字幕| 久久精品视频一| 伊人成综合网伊人222| 蜜臀一区二区三区精品免费视频| 一二三四社区欧美黄| 九色网友自拍视频手机在线| 亚洲va码欧洲m码| 丝袜美腿亚洲一区| 久久精品性爱视频| 视频在线观看一区二区|