阿里二面:使用消息隊列怎樣防止消息重復?
大家好,我是君哥。
使用消息隊列時,我們經常會遇到一個可能對業務產生影響的問題,消息重復。在訂單、扣款、對賬等對冪有要求的場景,消息重復的問題必須解決。
那怎樣應對重復消息呢?今天來聊一聊這個話題。
1.三個語義
正確使用消息隊列,我們會考慮到消息防丟失、防重復,我們介紹 3 個語義:
- At Least Once:在消息隊列中,指消息不丟失,一條消息最少被消費一次,但是可能會有重復消費。
- Exactly Once:在消息隊列中,消息被精準消費一次,不丟失,也不會重復;
- At Most Once:在消息隊列中,消息不會被重復消費,但是可能會有消息丟失
不同的消息場景,需要的語義不同。比如 Exactly Once 最難實現,一般需要引入事務消息。
不同使用場景,對語義的要求也不一樣。比如日志收集類的場景,At Most Once 就可以滿足,而支付類的場景則要求 Exactly Once。
2.消息重復
什么情況下會導致消息重復呢?
生產者發送消息后,Broker 保存成功,但是沒有成功給生產者返回 ACK,生產者以為消息發送失敗,重試,再次給 Broker 發送。Broker 保存了重復消息,導致 Consumer 多次消費。
圖片
消費者消費消息后,給 Broker 返回 ACK 失敗,導致 Broker 沒有修改偏移量,同一條消息再次發送給消費者,或者被消費者拉取到。
圖片
3.生產者防重
有的消息中間件是支持生產者冥等的。比如 Kafka 從 0.11.0 版本開始引入了冪等 Producer,可以使用下面代碼開啟冪等 Producer:
Properties props = new Properties();
//省略其他代碼
//配置冪等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
//創建生產者實例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);Kafka 實現生產者冪等的原理是在生產者引入了 Producer ID(PID)和 Sequence Number 這兩個參數。
- PID:Producer 擁有的 ID,唯一標識一個 Producer。
- Sequence Number:自增的數值,唯一標識同一個 Producer 發送到指定分區的消息 ID。
有了這兩個參數,Broker 單分區就可以唯一標識一個生產者發送的唯一一條消息<PID,SequenceNumber>。Broker 收到消息時,如果檢查到消息的<PID,SequenceNumber>已經存在,就不會再保留這條消息。
但冪等 Producer 只能在單分區下生效,多分區情況下是不生效的。因為多個分區之間并不能相互訪問對方的<PID,SequenceNumber>。
圖片
4.Broker 防重
Broker 如果可以防重,那對于生產者和消費者來說,節省了大量的工作。下面我們看下 Pulsar 是怎樣防重的。
Broker 通過參數 BrokerDeduplicationEnabled 開啟防重功能。對于 Producer 發送的重復消息,Broker 返回響應 -1:-1。
Producer 發送消息時,會帶一個 sequenceId 字段,Broker 會按照 ProducerName 維度記錄當前生產者最大的 sequenceId(highestSequenceId)。Broker 收到消息時,首先會判斷消息中的 sequenceId 是否大于自己保存的當前生產者的 highestSequenceId,如果是則保存消息并更新 highestSequenceId,否則丟棄消息,并且給 Producer 返回 -1:-1。
下面是三個極端情況:
- Producer 斷開連接:這種情況下,跟 Broker 重新建立連接后,本地保存的 sequenceId 還在,只要使用 sequenceId 遞增后發送消息即可;
- Producer 宕機:Producer 重啟后,緩存的 sequenceId 肯定不存在了,這時跟 Broker 重新建立連接后,Broker 會根據 ProducerName 找出 highestSequenceId 發給 Producer,Producer 使用這個 sequenceId 來發送消息;
- Producer 和 Broker 都宕機:Broker 重啟后,可以從宕機前保存的快照中恢復各 Producer 對應的 highestSequenceId 發送給各 Producer。但這個 highestSequenceId 不一定準確,因為 Broker 宕機瞬間很有可能最新的 sequenceId 沒有來得及保存快照。
需要注意的是,跟 Kafka 的冪等 Producer 類似,Pulsar 的 Broker 冪等也只能保證 Topic/Partition 級別。
5.消費者防重
從上面的分析可以看出,靠生產者防重和 Broker 防重,只能在 Topic/Partition 級別生效,這通常并不能滿足我們的需求。而為了避免消費者重復消費對業務造成影響,消息防重還是必要的。這就要求我們做最后一道防線,在消費端進行防重或冪等處理。
消費端做防重,就不再考慮消息中間件層面的配置(比如 sequenceId),而是從消息體進行下手。
生產者發送消息時,給消息體賦值一個全局唯一的 ID,消費者處理消息時,根據全局唯一 ID 做防重。
比如消費端的邏輯是保存一條訂單消息,那把唯一 ID 保存到數據庫并且加一個唯一索引,這樣根據唯一索引就可以做消息去重。
不過使用唯一索引也有缺點:
- 如果使用 MySQL 數據庫,不能使用 Change Buffer;
- 非插入的場景(比如更新庫存)不能去重。
對于唯一索引的缺點,我們可以引入 Redis 對唯一 ID 做保存,利用 setNx 判斷消息是否已經處理過。如下圖:
圖片
if (jedis.setnx(ID, "1") == 1) {
//處理業務,返回 ACK
}else {
//直接返回 返回 ACK
}6.總結
使用消息隊列,在一些場景下是需要防重的。主流消息隊列提供了一些防重的能力,但并不是完全可靠的。在對重復消息敏感的場景下,最好是在消費端處理消息時,從業務層面進行消息防重。






























