精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

如何在 SpringBoot 項目中控制 RocketMQ消費線程數量

開發
如何設置單個 topic 消費線程的最小數量和最大數量,用來區分不同 topic 吞吐量不同。

1 背景

最近在新項目開發中遇到一個有趣的問題,如何在 SpringBoot 項目中控制 RocketMQ 消費線程數量。如何設置單個 topic 消費線程的最小數量和最大數量,用來區分不同 topic 吞吐量不同。

我們先介紹一下 RocketMQ 消息監聽再來說明 RocketMQ 消費線程。

2 RocketMQ 消息監聽

設置消費者組為 my_consumer_group,監聽 TopicTest 隊列,并使用并發消息監聽器MessageListenerConcurrently

1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6 consumer.subscribe("TopicTest", "*");
7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
8 consumer.registerMessageListener(new MessageListenerConcurrently() {
9 @Override
10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
11 ConsumeConcurrentlyContext context) {
12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14 }
15 });
16 consumer.start();
17 System.out.printf("Consumer Started.%n");
18 }
19}

3 RocketMQ 中連接結構圖

圖片

4 消費監聽器

接口:org.apache.rocketmq.client.consumer.listener.MessageListener

圖片

有兩個子接口:

- 順序消費:MessageListenerOrderly
- 并發消費: MessageListenerConcurrently

圖片

4.1 MessageListenerConcurrently

作用:consumer并發消費消息的監聽器

圖片

比如,在 quick start 中,就是使用的并發消費消息監聽器:?

1 consumer.registerMessageListener(new MessageListenerConcurrently() {
2 @Override
3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
4 ConsumeConcurrentlyContext context) {
5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
7 }
8 });

方法返回值,是個枚舉:

1 package org.apache.rocketmq.client.consumer.listener;
2
3/**
4 * 并發消費mq消息結果
5 */
6public enum ConsumeConcurrentlyStatus {
7
8 /**
9 * Success consumption
10 * 成功消費
11 */
12 CONSUME_SUCCESS,
13
14 /**
15 * Failure consumption,later try to consume
16 * 失敗消費,稍后嘗試消費
17 *
18 *
19 * 如果 {@link MessageListener}返回的消費結果為 RECONSUME_LATER,則需要將這些消息發送給Broker延遲消息。
20 * 如果給broker發送消息失敗,將延遲5s后提交線程池進行消費。
21 *
22 * RECONSUME_LATER的消息發送入口: MQClientAPIImpl#consumerSendMessageBack,
23 * 命令編碼: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}
24 */
25 RECONSUME_LATER;
26}

畫外音:

當前,我們在具體開發中,肯定不會直接使用這種方式來寫consumer。

常用的Consumer實現是:基于 推 的consumer:DefaultMQPushConsumer

4.2 MessageListenerOrderly

作用:consumer順序消費消息的監聽器

5 消費線程池

5.1 DefaultMQPushConsumer

作用:基于 推 的consumer消費者

5.2 注冊并發消息監聽器

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener

圖片

當使用這個方法注冊消息監聽器時,實際上會把這個病發消息監聽器設置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner屬性中。

5.3 設置 consumer 消費 service

可選有兩種:?

并發消費的service

順序消費的service

當consumer在啟動的時,會使用MessageListener具體實現類型進行判斷:

圖片

MessageListener 就有并發和順序兩種,所以service也有兩種。

1public synchronized void start() throws MQClientException {
2 switch (this.serviceState) {
3 case CREATE_JUST:
4
5 // 省略一部分代碼...........
6
7 // 根據注冊的監聽器類型[并發消息監聽器/順序執行消息監聽器],來確定使用哪種消費服務.
8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
9 this.consumeOrderly = true;
10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
12 this.consumeOrderly = false;
13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
14 }
15 this.consumeMessageService.start();
16
17 // 省略一部分代碼..........
18 this.serviceState = ServiceState.RUNNING;
19 break;
20 case RUNNING:
21 case START_FAILED:
22 case SHUTDOWN_ALREADY:
23 throw new MQClientException("The PushConsumer service state not OK, maybe started once");
24 default:
25 break;
26 }
27
28 // 省略一部分代碼..........
29 }

如果使用的是并發消費的話,使用 ConsumeMessageConcurrentlyService :

在實例化的時候,會創建一個線程池:

圖片

1// 無界隊列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫無意義了.
2this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
3this.consumeExecutor = new ThreadPoolExecutor(
4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默認20
5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默認64
6 1000 * 60,
7 TimeUnit.MILLISECONDS,
8 this.consumeRequestQueue,
9 new ThreadFactoryImpl("ConsumeMessageThread_"));

consumer消費線程池參數:

  • 默認最小消費線程數 20
  • 默認最大消費線程數 64
  • keepAliveTime = 60*1000      單位:秒
  • 隊列:new LinkedBlockingQueue<>()? 無界隊列
  • 線程名稱:前綴是:ConsumeMessageThread_

注意:因為線程池使用的是無界隊列,那么設置的最大線程數,其實沒有什么意義。

5.4 修改線程池線程數

上面我們已經知道了,設置線程池的最大線程數是沒什么用的。

那我們其實可以設置線程池的最小線程數,來修改consumer消費消息時的線程池大小。

1public static void main(String[] args) throws InterruptedException, MQClientException {
2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
3
4 consumer.setConsumeThreadMin(30);
5 consumer.setConsumeThreadMax(64);
6
7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8 consumer.subscribe("TopicTest", "*");
9 consumer.registerMessageListener(new MessageListenerConcurrently() {
10
11 @Override
12 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
13 ConsumeConcurrentlyContext context) {
14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16 }
17 });
18 consumer.start();
19 System.out.printf("Consumer Started.%n");
20 }

注意:consumeThreadMin? 如果大于64,則也需要設置 consumeThreadMax 參數,因為有個校驗:

圖片

-修改線程池線程數-SpringBoot版

如果consumer是使用spring boot進行集成的,則可以這樣設置消費者線程數:

圖片

責任編輯:張燕妮 來源: 中生代技術
相關推薦

2022-08-02 10:01:42

架構

2022-11-23 15:44:49

2022-12-04 23:54:39

2017-07-04 19:02:17

ReacRedux 項目

2020-10-27 14:15:42

SpringBoot

2021-08-23 10:40:30

人工智能KubernetesAI

2009-04-07 09:12:35

敏捷新手入門大型開發

2021-09-14 07:06:13

React項目TypeScript

2021-09-15 07:56:32

TypeScriptVue項目

2020-03-17 08:04:11

物聯網隱私安全

2022-07-04 10:39:24

TienChin項目自定義

2021-03-30 10:46:42

SpringBoot計數器漏桶算法

2021-03-23 08:39:27

SpringBootRedis管道技術

2022-07-07 09:00:49

RocketMQ消費者消息消費

2025-01-03 16:32:13

SpringBoot虛擬線程Java

2023-09-26 08:01:46

消費者TopicRocketMQ

2023-03-28 07:08:09

RocketMQ消費者堆棧

2022-06-09 13:52:35

Vue協作開發項目

2025-05-12 02:00:00

2023-08-23 13:24:00

異步編程方法
點贊
收藏

51CTO技術棧公眾號

久久久久成人精品无码| 欧美一级日本a级v片| 国产又粗又长又硬| 国产精品欧美一区二区三区不卡 | 精品一区免费| 色婷婷综合中文久久一本| 欧美一级爽aaaaa大片| 在线观看中文字幕2021| 一本一道久久综合狠狠老| 日韩免费在线观看| 国模无码视频一区二区三区| 国产在线你懂得| 精品一区二区在线播放| 国内精品久久久久伊人av| 色哟哟精品观看| 4438五月综合| 欧美日韩国产限制| 亚洲高清视频在线观看| 亚洲av永久无码国产精品久久| 国产一级久久| 久久国产精品偷| 免费在线观看你懂的| 欧美一区=区三区| 亚洲三级在线免费观看| 久久精品成人一区二区三区蜜臀 | 成人欧美一区二区三区黑人| 国产精品国产a级| 国产一区精品视频| 亚洲中文字幕在线观看| 亚洲美洲欧洲综合国产一区| 久久激情五月丁香伊人| 手机av免费看| 四虎国产精品免费久久5151| 亚洲一区在线电影| 亚洲一区二区三区免费观看| 婷婷亚洲一区二区三区| 国产成人综合网站| 国产精品自产拍在线观看中文| 日韩免费一二三区| 99久久99久久精品国产片果冰| 亚洲精品www久久久久久广东| 极品粉嫩美女露脸啪啪| 黑人巨大亚洲一区二区久 | 日韩福利视频在线观看| 国产福利精品一区二区三区| 另类专区亚洲| 五月天中文字幕一区二区| 中文字幕一区二区三区最新| 免费观看的毛片| 国产精品一二三在| 国产精品久久久av| 西西44rtwww国产精品| 综合五月婷婷| 日韩在线中文字幕| 国产三级短视频| 九九综合在线| 亚洲欧美日韩高清| 中文字幕在线免费看线人| 4438全国亚洲精品观看视频| 51午夜精品国产| 亚洲天堂网2018| 欧美系列精品| 欧美亚男人的天堂| 国产九九在线视频| 成人在线免费| 欧美视频在线免费看| 大j8黑人w巨大888a片| 中文字幕在线播放网址| 欧美激情中文字幕一区二区| 日本一区视频在线观看| 第九色区av在线| 国产蜜臀av在线一区二区三区| 奇米视频888战线精品播放| 欧洲天堂在线观看| 国产网站一区二区| 午夜精品区一区二区三| 在线观看a视频| 国产精品国产三级国产aⅴ原创| 亚洲一区在线直播| a级网站在线播放| 亚洲女爱视频在线| 精品成在人线av无码免费看| 免费毛片b在线观看| 一本色道a无线码一区v| 欧美视频在线播放一区| 自拍偷拍欧美视频| 欧美日韩三级一区| 日本黄色一级网站| 99国产精品久久一区二区三区| 精品欧美一区二区久久| 久久久老熟女一区二区三区91| 无码日韩精品一区二区免费| 国产午夜精品全部视频在线播放| 欧美a级片免费看| 狠狠入ady亚洲精品经典电影| 欧美激情视频在线观看| 国产黄色片免费看| 日韩电影在线一区二区三区| 国产欧美亚洲精品| 亚洲国产精品久久人人爱潘金莲| 91视视频在线直接观看在线看网页在线看 | 久久久久网址| av在线电影网| 亚洲综合视频在线| 男人舔女人下面高潮视频| 国产69精品久久久久9999人| 日韩女优电影在线观看| 国产一二三四五区| 午夜日韩电影| 欧美在线视频观看免费网站| 一区二区三区麻豆| 成人手机在线视频| 日产精品高清视频免费| 在线观看午夜av| 一本色道**综合亚洲精品蜜桃冫| 亚洲精品在线视频播放| 丝袜久久网站| 久久天天躁夜夜躁狠狠躁2022| a v视频在线观看| 狠狠色丁香久久婷婷综合丁香| 国产区欧美区日韩区| 9i精品一二三区| 黑人狂躁日本妞一区二区三区| 久热精品在线观看视频| 欧洲vs亚洲vs国产| 欧美成人午夜激情| 久久这里只有精品9| 成人久久久精品乱码一区二区三区| 亚洲欧美日韩精品久久久| freexxx性亚洲精品| 欧美巨大另类极品videosbest| 波多野结衣影院| 欧美精品入口| 国产一区视频在线| 国产露出视频在线观看| 亚州成人在线电影| 亚欧精品在线视频| 国内精品久久久久久99蜜桃| 久久久久久久国产精品视频| 国产一级精品毛片| 91亚洲永久精品| 亚洲色成人www永久在线观看| 久久久久久一区二区三区四区别墅 | 国产免费成人在线| 一区二区三区四区高清视频 | jizzjizz欧美69巨大| 欧美高清激情brazzers| 成人免费无码大片a毛片| 三级在线电影| 国产成人亚洲综合a∨猫咪| 日韩三级电影| 天堂а√在线最新版中文在线| 日韩欧美在线不卡| 亚洲AV成人无码网站天堂久久| 先锋影音av在线| 麻豆成人入口| 久久国产精品影视| 国产精品无码一区| 99久久伊人久久99| 国产91在线亚洲| 免费观看在线一区二区三区| 久久精品国产一区| 国产精品久久777777换脸| 国产精品美女一区二区三区| 手机看片福利日韩| 成人高清电影网站| 国产精品免费久久久久影院| 超碰97在线免费观看| 色94色欧美sute亚洲线路一ni| 国产高清自拍视频| 免费一区视频| 日韩精品资源| 狂野欧美性猛交xxxx| 久久久av电影| 中文区中文字幕免费看| 中国色在线观看另类| 黄色成人免费看| 国产高清一区| 亚洲自拍偷拍在线| 乱插在线www| 亚洲国产欧美精品| 亚洲婷婷综合网| 中文字幕乱码日本亚洲一区二区| 黄色片在线免费| 99久久九九| 国产精品yjizz| 成人美女大片| 最好看的2019的中文字幕视频| 国产精品爽爽久久久久久| 中文字幕一区二区不卡| 亚洲国产午夜精品| 最新亚洲激情| 日本高清不卡一区二区三| 日韩一区中文| 97视频在线观看免费高清完整版在线观看 | 婷婷综合久久| 亚洲在线观看视频| 性欧美又大又长又硬| 最新日韩中文字幕| www.久久综合| 欧美视频在线看| 美国一级片在线观看| 福利一区在线观看| 午夜欧美福利视频| 欧美一区二区| 久久国产精品一区二区三区| 日韩久久一区| 97超级碰在线看视频免费在线看| yourporn在线观看中文站| 欧美乱妇23p| 免费观看一区二区三区毛片| 国产精品精品国产色婷婷| 性欧美18—19sex性高清| 日韩电影网1区2区| 可以看毛片的网址| 欧美一区二区三| 国内视频一区二区| 91丨精品丨国产| 日韩男女性生活视频| av在线导航| 精品亚洲一区二区三区| 国产av一区二区三区| 在线看国产日韩| 国产在线观看免费视频今夜| 中文一区二区完整视频在线观看| 四虎国产精品永久免费观看视频| 日本网站在线观看一区二区三区| 很污的网站在线观看| 亚洲一区在线| 亚洲亚洲精品三区日韩精品在线视频| 极品束缚调教一区二区网站| 国产自摸综合网| 亚洲妇女成熟| 九九九久久久久久| 日韩免费网站| 亚洲小视频在线观看| 欧洲av在线播放| 日韩一区二区三区免费看| 亚洲无码精品在线播放| 一本色道综合亚洲| 日本天堂网在线| 亚洲成人精品一区| 毛片a片免费观看| 亚洲欧美在线高清| 欧美美女性生活视频| 欧美国产精品久久| 免费看日本黄色片| 国产日韩一级二级三级| aaaaa级少妇高潮大片免费看| 成人激情黄色小说| 岛国精品一区二区三区| 九一九一国产精品| 性chinese极品按摩| 亚洲影院在线| 欧美二区在线视频| 中文国产一区| 女人天堂av手机在线| 国产一区二区你懂的| 国产中文字幕免费观看| 99热免费精品| 国产a级一级片| 久久久久久网| 好男人www社区| 美女视频一区二区| 91视频这里只有精品| 久久66热偷产精品| 性生活一级大片| 粉嫩欧美一区二区三区高清影视| 国产成人精品一区二区在线小狼| 国产精品一区二区视频| 免费看91视频| av成人免费在线| 黄瓜视频污在线观看| 欧美极品aⅴ影院| 成人免费毛片xxx| 一区二区三区四区不卡视频| 欧美爱爱小视频| 亚洲国产精品人人做人人爽| 中日韩精品视频在线观看| 色悠久久久久综合欧美99| 国产精品露脸视频| 正在播放一区二区| 国产黄a三级三级看三级| 精品88久久久久88久久久| 天堂资源中文在线| 伊人久久男人天堂| 成人午夜在线影视| 久久久久久久一区二区| **欧美日韩在线观看| 国产精品永久在线| av日韩精品| 免费在线一区二区| 欧美r级电影| 国产在线xxxx| 麻豆精品网站| 999久久久精品视频| 成人小视频免费在线观看| 国产av自拍一区| 亚洲激情男女视频| aaaaaa毛片| 日韩一区二区在线观看视频 | 日韩欧美一二三| 欧美一级淫片aaaaaa| 亚洲天堂第一页| 日本小视频在线免费观看| 日韩美女福利视频| 国内精品视频| 欧美精品一区二区三区在线看午夜| 国产在线日韩精品| 国产黄色激情视频| 日韩高清国产一区在线| av天堂一区二区| 国产精品日韩精品欧美在线 | 亚洲精品中文字幕乱码三区| 毛片在线免费视频| 欧美一级理论片| 大胆av不用播放器在线播放 | 国产精品国产三级国产aⅴ原创| 久久精品波多野结衣| 欧美午夜精品一区二区蜜桃| 刘亦菲久久免费一区二区| 色一情一乱一区二区| 三级在线观看视频| av激情久久| 欧美hd在线| 免费 成 人 黄 色| 国产精品一区免费视频| 日韩亚洲欧美中文字幕| 色综合久久久久| 亚洲精品911| 久久亚洲精品网站| 成人免费视频观看| 欧美亚洲国产免费| 99国产精品自拍| 亚洲少妇一区二区三区| 亚洲啪啪综合av一区二区三区| 亚洲另类欧美日韩| 337p亚洲精品色噜噜噜| freemovies性欧美| 日韩av电影院| 亚洲激情77| 欧美视频在线播放一区| 成人av在线资源| 久久97人妻无码一区二区三区| 7777精品伊人久久久大香线蕉经典版下载| 九九在线视频| 日韩av片免费在线观看| 首页亚洲中字| 成人精品视频在线播放| 国产成人精品一区二区三区四区| 久久人妻无码aⅴ毛片a片app| 欧美色手机在线观看| 国产精品麻豆一区二区三区 | 亚洲精品久久久久久下一站 | 91av在线不卡| 综合综合综合综合综合网| 成人一区二区三| 日本一二三四高清不卡| 国产精品人人爽| 久久久久久有精品国产| 欧美三级电影在线| 91福利国产成人精品播放| 国产精品成人免费| 亚洲精品无遮挡| 欧美一区二区三区……| 日本欧美视频| xxxx国产视频| 日韩欧美国产成人| 在线观看的av| 国产精华一区二区三区| 久久电影一区| 外国一级黄色片| 亚洲国产天堂久久国产91| 欧美日韩成人影院| 激情视频小说图片| 91老师国产黑色丝袜在线| 中文区中文字幕免费看| 欧美激情xxxx| 国产精品亚洲人成在99www| 超碰在线资源站| 懂色av影视一区二区三区| 免费在线观看黄色网| 精品国产一区二区三区麻豆免费观看完整版| 久久九九免费| 欧美三级日本三级| 亚洲欧美在线免费观看| 日日夜夜精品视频| 91激情视频在线| 天天影视涩香欲综合网| 秋霞a级毛片在线看| 久久爱av电影| 国模无码大尺度一区二区三区| av中文在线播放| 久久这里只有精品99| 国产亚洲一区| 中国黄色片视频| 678五月天丁香亚洲综合网| 在线中文字幕播放| 人人妻人人澡人人爽欧美一区双| 中文字幕欧美激情一区|