精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

RabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

開發 架構
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

訂閱模式類型

訂閱模式示例圖:

前面2個案例中,只有3個角色:

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
  • Fanout:廣播,將消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

Publish/Subscribe發布與訂閱模式

1、模式說明

發布訂閱模式:

每個消費者監聽自己的隊列。

生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息

2、案例

(1)生產者

package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建交換機
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           參數:
            1. exchange:交換機名稱
            2. type:交換機類型
                DIRECT("direct"):定向
                FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。
                TOPIC("topic") 通配符的方式
                HEADERS("headers") 參數匹配
            3. durable:是否持久化
            4. autoDelete:自動刪除
            5. internal:內部使用。 一般false
            6. arguments:參數
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        //6. 創建隊列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7. 綁定隊列和交換機
        /*
            queueBind(String queue, String exchange, String routingKey)
            參數:
                1. queue:隊列名稱
                2. exchange:交換機名稱
                3. routingKey:路由鍵,綁定規則
                    如果交換機的類型為fanout ,routingKey設置為""
         */
        channel.queueBind(queue1Name, exchangeName, "");
        channel.queueBind(queue2Name, exchangeName, "");
        //8. 發送消息至交換機,由交換機分發消息
        String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO....";
        channel.basicPublish(exchangeName, "", null, body.getBytes());
        //9. 釋放資源
        channel.close();
        connection.close();
        
    }
}

執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:

下面再來看看隊列,如下:

下面我們繼續來寫兩個消費者接收消息。

(2)消費者1:讀取隊列1的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub1 {

    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。

         */
        channel.queueDeclare(queueName, true, false, false, null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);

        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

(3)消費者2:讀取隊列2的消息

package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub2 {
    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。
         */
        channel.queueDeclare(queueName, true, false, false, null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);
        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

3、測試

啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。

  • 消費者1接收到的消息:

  • 消費者2接收到的消息:

從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。

4、小結

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。

發布訂閱模式與工作隊列模式的區別:

  • 工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
  • 發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
  • 發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。
責任編輯:姜華 來源: 今日頭條
相關推薦

2022-08-15 09:02:22

Redis模式訂閱消息

2023-11-20 08:54:38

2025-01-09 11:15:47

2022-06-27 13:56:10

設計模式緩存分布式系統

2022-12-02 07:28:58

Event訂閱模式Spring

2009-11-05 10:07:37

WCF設計模式

2024-03-28 08:07:42

RabbitMQ訂閱模式

2021-08-02 17:21:08

設計模式訂閱

2024-07-29 08:34:18

C++訂閱者模式線程

2013-10-31 14:30:44

CloudaAPI

2024-05-31 08:53:56

2023-12-04 08:24:23

2023-01-11 08:22:22

RabbitMQ通信模型

2021-04-18 21:07:32

門面模式設計

2023-11-07 12:09:44

TopicKafka

2025-03-11 09:30:00

2012-08-30 09:07:33

設計模式

2021-04-14 09:02:22

模式 設計建造者

2023-05-17 08:16:04

RabbitMQ消息傳遞

2012-10-08 11:18:38

企業應用架構工作單元模式
點贊
收藏

51CTO技術棧公眾號

国产成人亚洲综合青青| 亚洲丝袜一区在线| 日本日本19xxxⅹhd乱影响| 日本中文字幕电影在线观看 | 国产一区二区在线不卡| 欧美天天在线| 亚洲老头同性xxxxx| 久久久久久蜜桃一区二区| 在线观看中文| 国产视频一区在线观看| 91久久极品少妇xxxxⅹ软件| 特级做a爱片免费69| 欧美丰满老妇| 日韩精品视频在线播放| 色婷婷一区二区三区av免费看| 久久免费电影| 国产精品毛片大码女人| 久草热久草热线频97精品| 国产男男gay网站| 亚洲综合二区| 欧美激情视频给我| 国产破处视频在线观看| 日本韩国欧美超级黄在线观看| 欧美猛男gaygay网站| 精品久久久久久久免费人妻| 好久没做在线观看| 亚洲丝袜美腿综合| 视频二区一区| 四虎精品成人免费网站| 国产精品一品二品| 国产成人在线精品| 日韩大片免费在线观看| 欧美韩日精品| 中文字幕日韩欧美在线 | 精品国产亚洲av麻豆| 日韩1区2区日韩1区2区| 啪一啪鲁一鲁2019在线视频| 日本三级理论片| 综合久久一区| 日韩在线观看免费av| www亚洲色图| 免费一区二区三区视频导航| 亚洲精品电影网在线观看| 亚洲精品在线网址| 国产日韩中文在线中文字幕| 欧美三级电影在线看| 日韩有码免费视频| 欧美aa在线观看| 午夜视频久久久久久| 欧美黄色免费网址| 亚洲综合影视| 一区二区三区四区精品在线视频| 在线精品亚洲一区二区| 北条麻妃在线| 国产精品网站在线| 亚洲精品无人区| av网站在线免费观看| 国产高清精品网站| 99久久99久久| 亚洲精品一区二区三区新线路| 国内一区二区在线| 国产精品无av码在线观看| 在线观看国产小视频| 久久国产精品99久久久久久老狼 | 成人噜噜噜噜| 宅男噜噜噜66一区二区66| 国内av一区二区| 欧美成人精品午夜一区二区| 日韩一区二区三区电影在线观看 | 久久精品人人做人人综合 | 91九色精品视频| 国产日韩欧美中文字幕| 国产一区二区三区免费在线观看| 3d动漫啪啪精品一区二区免费| 精品人妻一区二区三区麻豆91| 国产乱人伦精品一区二区在线观看 | www.激情网| 2001个疯子在线观看| 高跟丝袜欧美一区| 国产三级三级三级看三级| 日本a人精品| 日韩精品一区二区三区四区视频| 成人啪啪18免费游戏链接| 国产精品45p| 亚洲欧美国产另类| 婷婷社区五月天| 韩日成人av| 国产999精品视频| 6—12呦国产精品| 国产成人8x视频一区二区| 好看的日韩精品视频在线| 国产三级视频在线看| 亚洲欧美日韩国产成人精品影院| 国产精品69久久久| 唐人社导航福利精品| 欧美一区二区视频在线观看2022| www.17c.com喷水少妇| 国产精品日韩精品中文字幕| 久久精品国产亚洲7777| 亚洲国产精品成人无久久精品 | 精品久久久99| 青青操综合网| 按摩亚洲人久久| 国产三级av片| 狠狠色综合色综合网络| 久久精品人成| 日本电影在线观看| 在线观看视频一区二区| 美国黄色一级视频| 欧美r级电影| 国产91av在线| 国产情侣一区二区| 久久久精品黄色| 99久久免费观看| 国产成+人+综合+亚洲欧美| 精品国产乱码久久久久久图片| 国产视频不卡在线| 国产精品毛片| 成人在线观看91| 黄色片免费在线观看| 91黄色免费版| 欧美在线一级片| 久久久久久久久丰满| 国产91在线高潮白浆在线观看| 欧美一级淫片免费视频魅影视频| 国产精品久久久久久久久动漫| 黄色免费视频大全| 亚洲国产aⅴ精品一区二区| 色综合影院在线| 一区二区三区在线观看av| 成人午夜视频网站| 狠狠干视频网站| 成人在线中文| 亚洲最大在线视频| 成人av网站在线播放| 成人久久18免费网站麻豆 | 中文亚洲欧美| 成人综合色站| 午夜激情在线| 日韩一区二区免费在线电影| 欧美日韩色视频| 蜜桃久久精品一区二区| 日韩中文字幕一区二区| 国产精品专区免费| 亚洲精品一区中文| 国产又爽又黄的视频| 91免费观看在线| 黄色片视频在线免费观看| 欧美激情极品| 欧美在线一级va免费观看| 天堂а√在线8种子蜜桃视频 | 91免费精品国偷自产在线在线| 久久偷看各类女兵18女厕嘘嘘| 国产精品一区二区av白丝下载 | 亚洲高清在线播放| 日本肉肉一区 | 秋霞在线午夜| 精品久久久久99| 懂色av.com| ww亚洲ww在线观看国产| 午夜肉伦伦影院| 中文字幕精品影院| 国产精品久久久久久久久免费| 成年人视频免费在线观看| 欧美三级中文字幕| 日韩在线观看免| 国产成人精品1024| 国产v片免费观看| 精品一区三区| 91精品久久久久久久久久另类 | 婷婷综合五月| 国产精品一区二区av| 老司机深夜福利在线观看| 亚洲欧美999| 怡春院在线视频| 亚洲精品自拍动漫在线| 国模无码视频一区| 久久婷婷久久| 国产日韩欧美大片| 欧美日韩看看2015永久免费| 国产成人精品久久亚洲高清不卡| 日本高清视频在线播放| 欧美成人精品二区三区99精品| 日本一本高清视频| 国产视频一区二区在线| 91网址在线观看精品| 亚洲福利免费| 亚洲一区二区三区四区中文| 亚洲一区二区三区免费| 欧美有码在线观看| 成人在线app| 国产丝袜视频一区| 亚洲中文一区二区三区| 亚洲成人动漫在线观看| 永久免费毛片在线观看| 成人综合婷婷国产精品久久| 手机看片福利日韩| 国产一区清纯| 视频一区国产精品| 激情亚洲另类图片区小说区| 国产精品一区二区久久久| 欧美xxxx视频| 色yeye香蕉凹凸一区二区av| 色综合免费视频| 69堂亚洲精品首页| 欧美h在线观看| 亚洲一区二区中文在线| 欧美成人久久久免费播放| 成人午夜av在线| av中文字幕网址| 久久天堂精品| 草草久久久无码国产专区| 外国成人免费视频| 日本精品二区| 精品福利网址导航| 亚洲淫片在线视频| 激情久久一区二区| 91极品视频在线| fc2ppv国产精品久久| 一区二区三区国产视频| 欧美偷拍视频| 亚洲大胆美女视频| www.com在线观看| 欧美日本一区二区| 超碰在线观看91| 精品久久香蕉国产线看观看gif| 波多野结衣爱爱视频| 国产精品久久久久久久久免费桃花| 中文字幕 日本| 国产99精品国产| 肉色超薄丝袜脚交| 久久精品国产在热久久| 粉嫩虎白女毛片人体| 校园激情久久| 国产91在线免费| 国产日韩视频| 日韩视频第二页| 99热免费精品在线观看| a级黄色小视频| 一区在线视频观看| 国产freexxxx性播放麻豆| 午夜欧美精品久久久久久久| 免费看av软件| 亚洲精品久久久| 日韩人妻精品一区二区三区| 99国产精品免费视频观看| 亚洲欧洲三级| 91亚洲国产| 中文字幕一区二区三区5566| 久久免费大视频| 在线观看免费91| 欧美高清一区| www精品久久| 日韩图片一区| 欧美日韩在线成人| 青青草伊人久久| 九九热免费在线观看| 国产米奇在线777精品观看| www.色.com| 丁香婷婷综合色啪| 国产草草浮力影院| 久久综合久久综合久久| 波多野结衣a v在线| 国产日本一区二区| 中文字幕求饶的少妇| 综合av第一页| 国产精品成人国产乱| 欧美午夜丰满在线18影院| 自拍偷拍福利视频| 91精品国产综合久久久蜜臀粉嫩 | 亚洲欧美视频在线| 二区在线观看| 久久国产色av| 麻豆国产在线| 国产精品视频专区| 日韩精品中文字幕吗一区二区| 国产精品久久久久久久久婷婷 | 老熟妇一区二区三区| 欧美日韩国产在线观看| 超碰在线人人干| 亚洲精品日韩在线| 欧美成人三区| 韩日欧美一区二区| 国产成人免费9x9x人网站视频| 国产视频福利一区| 99香蕉久久| 日韩精品一区二区三区四区五区| 婷婷综合社区| 久久久噜噜噜www成人网| 麻豆精品新av中文字幕| 国产大学生视频| 国产精品成人一区二区艾草| 久久亚洲国产成人精品性色| 日本福利一区二区| av观看在线免费| 亚洲欧洲日本专区| 福利在线视频网站| 日韩免费视频在线观看| 亚洲精品影片| 亚洲成人18| 99精品热视频只有精品10| 亚洲高清免费在线观看| 91免费在线看| 久草国产在线视频| 欧美色网一区二区| 秋霞av在线| 欧美激情欧美激情| av日韩一区| 日韩免费一区二区三区| 国内一区二区三区| 亚洲欧美国产中文| 久久午夜国产精品| 久久久久久久久久久久久久免费看 | 波多野结衣作品集| av中文字幕一区| 日韩欧美123区| 色哟哟一区二区在线观看| 丰满肉嫩西川结衣av| 久久精品视频导航| 成人免费福利| 狼狼综合久久久久综合网| 午夜精品久久| 一级黄色片国产| 国产三级精品三级| 天天干在线播放| 日韩av在线网址| 国产嫩草在线视频| 51国偷自产一区二区三区的来源| 青青草成人影院| 黄色a级片免费| 91啦中文在线观看| 国产又色又爽又黄的| 精品1区2区在线观看| 麻豆传媒视频在线| 国产精品一区二区久久久| 精品72久久久久中文字幕| 动漫av网站免费观看| 99re8在线精品视频免费播放| 久久久久99精品成人片毛片| 欧美一区二区视频网站| 超鹏97在线| 91黄色精品| 欧美成人有码| 少妇欧美激情一区二区三区| 亚洲男人都懂的| 97人妻精品一区二区三区动漫| 中文字幕在线亚洲| 成人在线中文| 在线看成人av电影| 国产一区不卡在线| 久一区二区三区| 亚洲国产三级网| 厕沟全景美女厕沟精品| 欧美在线播放一区二区| 视频一区二区三区入口| 亚洲ⅴ国产v天堂a无码二区| 欧美三级日韩三级国产三级| 日本在线播放| 亚洲伊人成综合成人网| 亚洲视频久久| 午夜视频在线观看国产| 精品久久久一区二区| 国产亚洲依依| 国产自产女人91一区在线观看| 亚洲精品一区二区在线看| 亚洲av无码久久精品色欲| 亚洲福利一二三区| 美女做暖暖视频免费在线观看全部网址91 | 日韩av网站电影| 偷拍中文亚洲欧美动漫| 中文字幕成人一区| 福利一区在线观看| 中文字幕av影院| 日韩在线观看精品| caoporn成人| 黄色高清无遮挡| 亚洲视频一二区| 手机看片一区二区三区| 国产精品极品美女粉嫩高清在线| 手机在线一区二区三区| 又色又爽又黄18网站| 欧美性精品220| 免费人成在线观看播放视频| 99国产在线观看| 丝袜美腿亚洲综合| 日韩视频中文字幕在线观看| 日韩精品极品在线观看| 国产69精品久久| 日本a视频在线观看| 国产精品欧美一级免费| 亚洲精品久久久久久久久久| 国产不卡av在线| 午夜日韩在线| 五月天综合视频| 日韩精品一区国产麻豆| 精品欧美日韩精品| 丁香六月激情婷婷| 国产精品视频yy9299一区| 黄色一级a毛片| 成人妇女免费播放久久久|