精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kafka消費位點管理沒你想的那么簡單

云計算 Kafka
熟悉RocketMQ的小伙伴都知道RocketMQ已經(jīng)默認幫我實現(xiàn)好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

背景

如果你習慣了使用RocketMQ這種自動擋管理消費位點,消息失敗重試的方式。你再來使用kafka,會發(fā)現(xiàn)kafka這種手動擋的消費位點管理就沒那么容易了

熟悉RocketMQ的小伙伴都知道RocketMQ已經(jīng)默認幫我實現(xiàn)好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

kafka消費位點管理

kafka消費位點有兩種管理方式

  1. 手動提交消費位點
  2. 自動提交消費位點

自動提交消費位點

想要設置自動提交消費位點我們只需要設置兩個屬性

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 自動提交消費位點
  2. ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG 自動提交消費位點的時間間隔

一個簡單的消費代碼如下

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 自動提交消費位點的時間間隔
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            try {
                handlerMessage(record);
            } catch (Exception e) {
                log.error("處理消息異常: {}", record, e);
                // 循環(huán)繼續(xù)
            }

        }
    }

自動提交消費位點有幾個缺點

  1. 會出現(xiàn)重復消費:比如Consumer每5秒自動提交一次位移,如果在第4秒時,消費了消息,但是還沒有提交位移,此時Consumer掛掉了,那么下次Consumer啟動時,會從上次提交的位移開始消費,這樣就會導致消息重復消費。 當然比如出現(xiàn)Rebalance也是會出現(xiàn)重復消費的情況
  2. 無法精準控制消費位點

手動提交消費位點

手動提交消費位點又分兩種

  1. 同步提交(commitSync)
  2. 異步提交(commitAsync)

同步提交(commitSync)

同步提交的方式很簡單,就是每次消費完通過調(diào)用API consumer.commitSync。

相關(guān)的代碼如下:

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            // 注意這里消費業(yè)務邏輯上消費失敗后的消息處理
            handlerMessage(records);
            try {
                // 消費成功后手動提交位點
                consumer.commitSync();
            } catch (CommitFailedException e) {
                // 消費位點提交失敗異常處理
                handleError(e); 
            }
        }

同步提交的方式有一個缺點,調(diào)用commitSync()時,Consumer會處于阻塞狀態(tài),直到broker返回提交成功,嚴重影響消費性能。

異步提交(commitAsync)

異步提交的方式很簡單,就是每次消費完通過調(diào)用API consumer.commitAsync。

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            handlerMessage(records); // 處理消息
            consumer.commitAsync((offsets, exception) -> {
                if (exception != null)
                    handleError(exception);
            });
        }

commitAsync主要是提供了異步回調(diào),通過回調(diào)來通知消費位點是否提交成功。

異步提交消費位點也有一些缺點,比如消費位點不能重復提交。因為提交位點失敗后,重新提交位點可能更晚的消費位點已經(jīng)提交了,這里提交已經(jīng)是沒有意義的了。

spring-kafka消息消費

可以看到不管是同步提交消費位點還是異步提交消費位點,都有一些問題,想要寫出生產(chǎn)可用的消費代碼,需要注意的細節(jié)非常多。

比如消費失敗后的消息如何處理,是停止消費跳出循環(huán),還是說記錄消費失敗的消息,人工處理等。

這里我們可以簡單看看spring-kafka是如何消費消息的。

我們簡單看看主流程代碼:

圖片圖片

這里我們忽略源碼的一些其他細節(jié)。只分析主要的消費流程。

  • invokeOnMessage(cRecord); 處理消息

可以看到invokeOnMessage是被整個try-catch包裹的,這樣就保證了消費失敗后不會影響整個消費流程。

具體我們先看看消息正常處理的邏輯。

private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {

   if (cRecord.value() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.key() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.value() == null && this.checkNullValueForExceptions) {
    checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
   }
   if (cRecord.key() == null && this.checkNullKeyForExceptions) {
    checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
   }
   doInvokeOnMessage(cRecord);
   if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
    ackCurrent(cRecord);
   }
   if (this.isCountAck || this.isTimeOnlyAck) {
    doProcessCommits();
   }
  }

這里主要是一些異常校驗,然后就是判斷是否可以提交消費位點。如果可以則調(diào)用doProcessCommits()進行正常的消費位點提交。

  • doProcessCommits() 消費位點處理

如果消費位點提交失敗也會進行一些異常處理。

private void doProcessCommits() {
   if (!this.autoCommit && !this.isRecordAck) {
    try {
     processCommits();
    }
    catch (CommitFailedException cfe) {
     if (this.remainingRecords != null && !this.isBatchListener) {
      ConsumerRecords<K, V> pending = this.remainingRecords;
      this.remainingRecords = null;
      List<ConsumerRecord<?, ?>> records = new ArrayList<>();
      for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
       records.add(kvConsumerRecord);
      }
      this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
        KafkaMessageListenerContainer.this.thisOrParentContainer);
     }
    }
   }
  }

如果消費位點提交失敗則會調(diào)用commonErrorHandler進行異常處理。

commonErrorHandler有多個實現(xiàn)類,有一個默認實現(xiàn)DefaultErrorHandler

  • 消息消費失敗異常處理

如果消息消費失敗,也提供了一個異常處理擴展invokeErrorHandler(cRecord, iterator, e);

里面實際使用的也是DefaultErrorHandler

核心的處理邏輯主要還是在SeekUtils中封裝

  • DefaultErrorHandler
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
   Consumer<?, ?> consumer, MessageListenerContainer container) {

  SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
    getFailureTracker(), this.logger, getLogLevel());
 }
  • SeekUtils
public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
    Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
    RecoveryStrategy recovery, LogAccessor logger, Level level) {}

可以看到有一個RecoveryStrategy參數(shù),這個是消息消費失敗如何恢復,比如我們需要手動增加一個類似死信隊列的topic,這里消息消費失敗就會自動發(fā)送到我們的死信隊列

死信隊列的topic名字生成規(guī)則主要是topicName + -dlt

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
  DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());

總結(jié)

可以看到如果我們單純的使用kafka-client原生的sdk來進行消息消費,是非常容易出現(xiàn)問題的。

我們需要很多細節(jié),比如

  1. 消息消費失敗了如何處理,是否需要重試,如果重試還是失敗怎么辦?丟掉還是手動處理丟到自己創(chuàng)建的死信隊列中。
  2. 消費位點提交失敗了如何處理。
  3. 消費位點是使用同步提交還是異步提交?或者混合提交?

所以如果spring boot項目還是建議使用spring相關(guān)已經(jīng)封裝好的kafka sdk。

非必要盡量不要使用原生的kafka-client sdk。

責任編輯:武曉燕 來源: 小奏技術(shù)
相關(guān)推薦

2015-04-30 10:12:13

開源云平臺OpenStack

2017-08-09 14:49:03

WebHTTPS瀏覽器

2014-08-25 10:17:54

數(shù)據(jù)中心管理

2021-03-29 13:00:50

代碼替換開發(fā)

2020-03-26 10:41:02

API網(wǎng)關(guān)大公司

2014-03-14 09:35:56

內(nèi)存優(yōu)化軟件內(nèi)存優(yōu)化

2015-06-24 10:32:13

訊鳥云計算會展

2016-01-07 10:17:48

2025-08-05 07:58:28

2021-08-02 15:24:19

Windows 11Windows微軟

2014-03-21 15:30:06

產(chǎn)品經(jīng)理PM能力

2023-12-28 12:07:21

2016-07-25 12:58:07

SDN路由故障排查

2013-01-15 10:09:43

Windows Ser

2014-07-09 09:06:33

SDN自動化

2010-08-04 09:20:31

JavaScript

2009-06-22 14:02:00

2019-05-17 09:33:50

圖像識別三維重建文本識別

2020-01-03 08:44:05

TCP網(wǎng)絡協(xié)議三次握手

2013-02-19 09:21:01

Win 8
點贊
收藏

51CTO技術(shù)棧公眾號

91久久夜色精品国产按摩| 69久成人做爰电影| 粉嫩蜜臀av国产精品网站| 97视频免费观看| 国产中年熟女高潮大集合| 国产毛片精品久久| 亚洲成人自拍网| 日韩啊v在线| www.四虎在线观看| 麻豆精品网站| 欧美成人一二三| 在线观看福利片| 欧洲大片精品免费永久看nba| 欧美色道久久88综合亚洲精品| 一区二区三区欧美成人| 四季av日韩精品一区| 蜜臀av一区二区| 69视频在线播放| 亚洲天堂网av在线| 一本色道久久综合狠狠躁的番外| 91精品国产综合久久精品app| 一本大道熟女人妻中文字幕在线| 国产色在线观看| 91欧美激情一区二区三区成人| 91嫩草在线视频| 91青青草视频| 99精品国产福利在线观看免费| 久久人人爽亚洲精品天堂| 亚洲精品成人无码熟妇在线| 亚洲精选av| 在线播放国产精品二区一二区四区| 777米奇影视第四色| 欧美videossex| 亚洲男人的天堂在线aⅴ视频| 日韩成人在线资源| 欧美女v视频| 99精品视频免费在线观看| aaa级精品久久久国产片| 中文字幕理论片| 久久九九99| 2019亚洲日韩新视频| 青青草原免费观看| 亚洲综合专区| 欧美成年人视频| 国产精品国产精品88| 欧美a级片视频| 在线观看不卡av| jizz中文字幕| 精品免费在线| 亚洲网站在线播放| 妺妺窝人体色WWW精品| 日韩精品欧美大片| 亚洲精品97久久| 少妇精品无码一区二区三区| baoyu135国产精品免费| 亚洲成av人影院在线观看| 精产国品一区二区三区| 久久三级中文| 日韩欧美国产三级| 国产成人av免费观看| 视频精品一区二区三区| 亚洲成人动漫在线播放| 男男做爰猛烈叫床爽爽小说| 大伊香蕉精品在线品播放| 亚洲大胆人体在线| 国产麻豆天美果冻无码视频 | 国产一区二区丝袜| 亚洲中文一区二区三区| 国产一区二区精品在线观看| 1卡2卡3卡精品视频| 亚洲成a人片77777精品| 99视频有精品| 欧美一区二区在线| 欧美a在线看| 亚洲一区二区三区四区在线观看 | 日韩免费av一区二区三区| 91精彩视频在线观看| 中文字幕一区二区三区精华液| 亚洲综合激情五月| 欧美黑人猛交| 色婷婷精品久久二区二区蜜臀av| 国产一区二区在线免费播放| 亚洲精品伊人| 亚洲精品一区二区三区香蕉| 女人被狂躁c到高潮| 欧美精品乱码| 欧美激情精品久久久久久免费印度 | 性欧美xxxx视频在线观看| 久久精品无码av| 精品中文av资源站在线观看| 97在线电影| 极品美乳网红视频免费在线观看 | 亚洲男人天堂古典| 国产黄在线免费观看| 一本色道精品久久一区二区三区| 国产精品久久久久久av下载红粉| 午夜精品久久久久久久91蜜桃| 91丝袜呻吟高潮美腿白嫩在线观看| 亚洲区一区二区三区| 秋霞在线视频| 欧美午夜宅男影院| 欲求不满的岳中文字幕| 久久久久av| 欧美一级片在线播放| 国产三级漂亮女教师| 99re热这里只有精品免费视频| 亚洲欧洲日本国产| 日韩av影片| 欧美mv日韩mv亚洲| 纪美影视在线观看电视版使用方法| 亚洲一级电影| 91久久精品一区| 九色网友自拍视频手机在线| 一区二区三区四区不卡视频| 最近中文字幕一区二区| 日韩欧美中文字幕电影| 欧美成人久久久| 中文精品久久久久人妻不卡| 91在线看国产| 国产曰肥老太婆无遮挡| 亚洲日本中文| 伊人伊成久久人综合网站| 国产一级视频在线| 国产麻豆精品theporn| 日本高清一区| 欧美黑人粗大| 日韩av中文在线| 国产亚洲精品久久久久久无几年桃| 麻豆成人久久精品二区三区小说| 久久久久久欧美精品色一二三四| 欧美78videosex性欧美| 91麻豆精品国产自产在线 | 亚洲人视频在线观看| 一区二区三区成人在线视频| 天堂在线一区二区三区| 久久久综合色| 国产噜噜噜噜久久久久久久久| 麻豆app在线观看| 日韩欧美999| 日本黄色特级片| 亚洲欧美大片| 久久久久se| 欧洲亚洲两性| 国产小视频91| 最近中文字幕在线免费观看| 久久久不卡网国产精品一区| 国产综合免费视频| 亚洲欧洲免费| 国产精品96久久久久久又黄又硬 | 精品视频在线播放一区二区三区 | 一个人www视频在线免费观看| 亚洲成在人线av| 日韩欧美一级视频| 久久美女艺术照精彩视频福利播放| 黄色av网址在线播放| 欧美挤奶吃奶水xxxxx| 欧美一级电影久久| 国产九九在线| 欧美精品久久99| 精品无码久久久久成人漫画| 国产精品亚洲人在线观看| av影院在线播放| 日韩国产在线不卡视频| 国模精品一区二区三区色天香| 天堂在线视频免费| 色中色一区二区| a一级免费视频| 国产成人在线网站| 青青草国产精品视频| 伊甸园亚洲一区| 国产成人av在线| 日本三级在线视频| 日韩美女视频一区二区在线观看| 日韩手机在线观看| 国产欧美精品一区二区三区四区 | 久久久噜噜噜久久中文字幕色伊伊| 成人在线观看黄| 999久久久精品国产| 2022国产精品| 最近高清中文在线字幕在线观看1| 国产亚洲一级高清| 国产成人精品一区二三区四区五区| 亚洲一区二区精品久久av| brazzers精品成人一区| 激情亚洲综合在线| 国产精品裸体瑜伽视频| 日韩精品欧美| 国精产品99永久一区一区| 久久天堂av| 欧美日本黄视频| 国产在线视频网| 日韩精品一区二区在线| 日韩综合在线观看| 亚洲综合一区二区| 青娱乐国产视频| 国产成人av资源| 亚洲高清在线免费观看| 欧美91大片| 神马影院一区二区| 美国成人xxx| 91精品免费看| 日韩影片中文字幕| 欧美激情一级二级| 自拍视频在线播放| 亚洲免费一级电影| 乱精品一区字幕二区| 欧美日韩一区二区三区高清| 日产欧产va高清| 亚洲人妖av一区二区| 国产免费一区二区三区网站免费| 国产91精品入口| 在线观看国产中文字幕| 亚洲欧美视频| 久久综合久久网| 欧美国产激情| 亚洲三区在线观看| 九九综合九九| 国产私拍一区| 日韩在线亚洲| 成人日韩在线电影| 欧美日韩女优| 欧美在线观看网址综合| 久久电影网站| 欧美插天视频在线播放| 亚洲成人三级| 尤物精品国产第一福利三区| 亚州视频一区二区三区| 亚洲精品一区二区三区影院| www.黄色一片| 91精品久久久久久久91蜜桃| 中文字幕第99页| 欧美写真视频网站| 艳妇乳肉豪妇荡乳av无码福利 | 久久久国产欧美| 男人的天堂亚洲在线| 日本www在线视频| 在线日韩视频| 久久久久99精品成人片| 欧美日韩国内| 国产乱子伦精品无码专区| 一区二区日韩欧美| 老汉色影院首页| 欧美88av| 成年人看的毛片| 亚洲国产婷婷| 国产特级黄色大片| 米奇777在线欧美播放| 激情综合网婷婷| 首页亚洲欧美制服丝腿| 欧美一级黄色影院| 日本视频免费一区| 欧美日韩中文不卡| 开心九九激情九九欧美日韩精美视频电影 | 任你操精品视频| 国产精品国产a级| 欧美做爰啪啪xxxⅹ性| 亚洲欧美电影院| 麻豆国产尤物av尤物在线观看| 亚洲一二三区在线观看| 日韩高清精品免费观看| 欧美性xxxxx| 亚洲视频在线观看免费视频| 欧美人xxxx| 亚洲男人第一天堂| 日韩精品亚洲视频| 成人亚洲综合天堂| 久久综合伊人77777蜜臀| 久草在线视频网站| 欧美性在线观看| 国产精品第一国产精品| 亚洲尤物视频网| 久久激情av| 日产精品久久久一区二区| 999久久久91| 欧美成人高潮一二区在线看| 日韩成人免费看| 成人高清在线观看视频| www.在线欧美| 欧美成人久久久免费播放| 亚洲一区二区在线免费看| 欧美brazzers| 欧美一级夜夜爽| 天天在线女人的天堂视频| 最近更新的2019中文字幕| 国产99re66在线视频| 国产成人在线视频| 国产精品一站二站| 麻豆传媒一区| 欧美一区二区三区另类 | 亚洲国产精品一区二区三区| 成人免费视频| 国产综合在线看| 日韩免费大片| 免费在线成人av电影| 天天操综合网| 啊啊啊一区二区| 国产在线麻豆精品观看| 91精彩刺激对白露脸偷拍| 亚洲精品水蜜桃| 精品一区二三区| 亚洲精品在线观看视频| 蜜桃av在线免费观看| 欧美性一区二区三区| 亚洲精品国产九九九| 日本不卡二区高清三区| 伊人久久婷婷| 99国产精品久久久久久| 久久麻豆一区二区| 国产 日韩 欧美 成人| 91.com视频| av影片免费在线观看| 国模视频一区二区| 伊人www22综合色| 最新不卡av| 日本va欧美va欧美va精品| 中文字幕三级电影| 亚洲精品国产高清久久伦理二区| 亚洲视屏在线观看| 亚洲欧美另类自拍| h片精品在线观看| 99视频国产精品免费观看| 国产精品97| 依人在线免费视频| 欧美国产日韩亚洲一区| 久久夜色精品国产噜噜亚洲av| 亚洲精品一区二区三区蜜桃下载 | 国产免费成人av| 欧美精品乱码| 久章草在线视频| 91捆绑美女网站| www.国产高清| 精品亚洲夜色av98在线观看| 毛片在线网站| 精品国产乱码久久久久久久软件| 亚洲图片在线| 亚洲一区二区三区四区av| 一区二区三区日韩精品| 国产伦理一区二区| 久久久999国产精品| 亚洲午夜剧场| 小说区视频区图片区| 韩国欧美国产一区| 综合五月激情网| 欧美成人a视频| 日皮视频在线观看| 国产精品久久久久久免费观看 | 红桃视频在线观看一区二区| 日韩av一二三四| 中文字幕久久午夜不卡| 真实新婚偷拍xxxxx| 日韩在线视频一区| www.久久99| 草草视频在线免费观看| 不卡视频在线看| 91玉足脚交嫩脚丫在线播放| 亚洲三级黄色在线观看| 黄色成人在线观看网站| 黄色污污在线观看| 成人高清视频免费观看| 午夜影院免费在线观看| 深夜成人在线观看| 国产精品亚洲一区二区在线观看| 欧美一区二区视频在线播放| 99国产精品久久久久久久久久久 | а√天堂资源官网在线资源| 精品久久蜜桃| 石原莉奈一区二区三区在线观看| 四虎国产成人精品免费一女五男| 欧美喷水一区二区| 欧美亚洲天堂| 欧美亚洲爱爱另类综合| 久久精品久久99精品久久| 2018天天弄| 亚洲人成欧美中文字幕| 国产一区高清| 日本丰满少妇xxxx| 国产欧美视频一区二区三区| 国产免费无遮挡| 欧美在线视频免费观看| 日韩一级毛片| 成年人小视频在线观看| 色网站国产精品| 91在线中字| 日本一区二区三区四区在线观看| 国产一区在线观看麻豆| 97超碰人人干| 精品国产一区av| 亚洲精品蜜桃乱晃| 亚洲成人av免费观看| 色综合天天综合网天天狠天天| 国产精品久久麻豆| 久久天天狠狠| 国产精品亚洲视频| 亚洲成熟少妇视频在线观看| 欧美日本亚洲视频| 欧美日韩激情在线一区二区三区| 蜜臀视频在线观看| 精品视频免费看| 三级在线观看视频| 日本黄网站色大片免费观看|