精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

手把手教你 springBoot 整合 rabbitMQ,利用 MQ 實現事務補償

開發 前端
rabbitMQ 在互聯網公司有著大規模應用,本篇將實戰介紹 springboot 整合 rabbitMQ,同時也將在具體的業務場景中介紹利用 MQ 實現事務補償操作。

[[341134]]

本文轉載自微信公眾號「Java極客技術」,作者鴨血粉絲。轉載本文請聯系Java極客技術公眾號。  

rabbitMQ 在互聯網公司有著大規模應用,本篇將實戰介紹 springboot 整合 rabbitMQ,同時也將在具體的業務場景中介紹利用 MQ 實現事務補償操作。

一、介紹

本篇我們一起來實操一下SpringBoot整合rabbitMQ,為后續業務處理做鋪墊。

廢話不多說,直奔主題!

二、整合實戰

2.1、創建一個 maven 工程,引入 amqp 包

  1. <!--amqp 支持--> 
  2. <dependency> 
  3.     <groupId>org.springframework.boot</groupId> 
  4.     <artifactId>spring-boot-starter-amqp</artifactId> 
  5. </dependency> 

2.2、在全局文件中配置 rabbitMQ 服務信息

  1. spring.rabbitmq.addresses=197.168.24.206:5672 
  2. spring.rabbitmq.username=guest 
  3. spring.rabbitmq.password=guest 
  4. spring.rabbitmq.virtual-host=/ 

其中,spring.rabbitmq.addresses參數值為 rabbitmq 服務器地址

2.3、編寫 rabbitmq 配置類

  1. @Slf4j 
  2. @Configuration 
  3. public class RabbitConfig { 
  4.  
  5.     /** 
  6.      * 初始化連接工廠 
  7.      * @param addresses 
  8.      * @param userName 
  9.      * @param password 
  10.      * @param vhost 
  11.      * @return 
  12.      */ 
  13.     @Bean 
  14.     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses, 
  15.                                         @Value("${spring.rabbitmq.username}") String userName, 
  16.                                         @Value("${spring.rabbitmq.password}") String password
  17.                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) { 
  18.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
  19.         connectionFactory.setAddresses(addresses); 
  20.         connectionFactory.setUsername(userName); 
  21.         connectionFactory.setPassword(password); 
  22.         connectionFactory.setVirtualHost(vhost); 
  23.         return connectionFactory; 
  24.     } 
  25.  
  26.     /** 
  27.      * 重新實例化 RabbitAdmin 操作類 
  28.      * @param connectionFactory 
  29.      * @return 
  30.      */ 
  31.     @Bean 
  32.     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ 
  33.         return new RabbitAdmin(connectionFactory); 
  34.     } 
  35.  
  36.     /** 
  37.      * 重新實例化 RabbitTemplate 操作類 
  38.      * @param connectionFactory 
  39.      * @return 
  40.      */ 
  41.     @Bean 
  42.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ 
  43.         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); 
  44.         //數據轉換為json存入消息隊列 
  45.         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
  46.         return rabbitTemplate; 
  47.     } 
  48.  
  49.     /** 
  50.      * 將 RabbitUtil 操作工具類加入IOC容器 
  51.      * @return 
  52.      */ 
  53.     @Bean 
  54.     public RabbitUtil rabbitUtil(){ 
  55.         return new RabbitUtil(); 
  56.     } 
  57.  

2.4、編寫 RabbitUtil 工具類

  1. public class RabbitUtil { 
  2.  
  3.     private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitAdmin rabbitAdmin; 
  7.  
  8.     @Autowired 
  9.     private RabbitTemplate rabbitTemplate; 
  10.  
  11.     /** 
  12.      * 創建Exchange 
  13.      * @param exchangeName 
  14.      */ 
  15.     public void addExchange(String exchangeType, String exchangeName){ 
  16.         Exchange exchange = createExchange(exchangeType, exchangeName); 
  17.         rabbitAdmin.declareExchange(exchange); 
  18.     } 
  19.  
  20.     /** 
  21.      * 刪除一個Exchange 
  22.      * @param exchangeName 
  23.      */ 
  24.     public boolean deleteExchange(String exchangeName){ 
  25.         return rabbitAdmin.deleteExchange(exchangeName); 
  26.     } 
  27.  
  28.     /** 
  29.      * 創建一個指定的Queue 
  30.      * @param queueName 
  31.      * @return queueName 
  32.      */ 
  33.     public void addQueue(String queueName){ 
  34.         Queue queue = createQueue(queueName); 
  35.         rabbitAdmin.declareQueue(queue); 
  36.     } 
  37.  
  38.     /** 
  39.      * 刪除一個queue 
  40.      * @return queueName 
  41.      * @param queueName 
  42.      */ 
  43.     public boolean deleteQueue(String queueName){ 
  44.         return rabbitAdmin.deleteQueue(queueName); 
  45.     } 
  46.  
  47.     /** 
  48.      * 按照篩選條件,刪除隊列 
  49.      * @param queueName 
  50.      * @param unused 是否被使用 
  51.      * @param empty 內容是否為空 
  52.      */ 
  53.     public void deleteQueue(String queueName, boolean unused, boolean empty){ 
  54.         rabbitAdmin.deleteQueue(queueName,unused,empty); 
  55.     } 
  56.  
  57.     /** 
  58.      * 清空某個隊列中的消息,注意,清空的消息并沒有被消費 
  59.      * @return queueName 
  60.      * @param queueName 
  61.      */ 
  62.     public void purgeQueue(String queueName){ 
  63.         rabbitAdmin.purgeQueue(queueName, false); 
  64.     } 
  65.  
  66.     /** 
  67.      * 判斷指定的隊列是否存在 
  68.      * @param queueName 
  69.      * @return 
  70.      */ 
  71.     public boolean existQueue(String queueName){ 
  72.         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true
  73.     } 
  74.  
  75.     /** 
  76.      * 綁定一個隊列到一個匹配型交換器使用一個routingKey 
  77.      * @param exchangeType 
  78.      * @param exchangeName 
  79.      * @param queueName 
  80.      * @param routingKey 
  81.      * @param isWhereAll 
  82.      * @param headers EADERS模式類型設置,其他模式類型傳空 
  83.      */ 
  84.     public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  85.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  86.         rabbitAdmin.declareBinding(binding); 
  87.     } 
  88.  
  89.     /** 
  90.      * 聲明綁定 
  91.      * @param binding 
  92.      */ 
  93.     public void addBinding(Binding binding){ 
  94.         rabbitAdmin.declareBinding(binding); 
  95.     } 
  96.  
  97.     /** 
  98.      * 解除交換器與隊列的綁定 
  99.      * @param exchangeType 
  100.      * @param exchangeName 
  101.      * @param queueName 
  102.      * @param routingKey 
  103.      * @param isWhereAll 
  104.      * @param headers 
  105.      */ 
  106.     public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  107.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  108.         removeBinding(binding); 
  109.     } 
  110.  
  111.     /** 
  112.      * 解除交換器與隊列的綁定 
  113.      * @param binding 
  114.      */ 
  115.     public void removeBinding(Binding binding){ 
  116.         rabbitAdmin.removeBinding(binding); 
  117.     } 
  118.  
  119.     /** 
  120.      * 創建一個交換器、隊列,并綁定隊列 
  121.      * @param exchangeType 
  122.      * @param exchangeName 
  123.      * @param queueName 
  124.      * @param routingKey 
  125.      * @param isWhereAll 
  126.      * @param headers 
  127.      */ 
  128.     public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  129.         //聲明交換器 
  130.         addExchange(exchangeType, exchangeName); 
  131.         //聲明隊列 
  132.         addQueue(queueName); 
  133.         //聲明綁定關系 
  134.         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  135.     } 
  136.  
  137.     /** 
  138.      * 發送消息 
  139.      * @param exchange 
  140.      * @param routingKey 
  141.      * @param object 
  142.      */ 
  143.     public void convertAndSend(String exchange, String routingKey, final Object object){ 
  144.         rabbitTemplate.convertAndSend(exchange, routingKey, object); 
  145.     } 
  146.  
  147.     /** 
  148.      * 轉換Message對象 
  149.      * @param messageType 
  150.      * @param msg 
  151.      * @return 
  152.      */ 
  153.     public Message getMessage(String messageType, Object msg){ 
  154.         MessageProperties messageProperties = new MessageProperties(); 
  155.         messageProperties.setContentType(messageType); 
  156.         Message message = new Message(msg.toString().getBytes(),messageProperties); 
  157.         return message; 
  158.     } 
  159.  
  160.     /** 
  161.      * 聲明交換機 
  162.      * @param exchangeType 
  163.      * @param exchangeName 
  164.      * @return 
  165.      */ 
  166.     private Exchange createExchange(String exchangeType, String exchangeName){ 
  167.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  168.             return new DirectExchange(exchangeName); 
  169.         } 
  170.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  171.             return new TopicExchange(exchangeName); 
  172.         } 
  173.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  174.             return new HeadersExchange(exchangeName); 
  175.         } 
  176.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  177.             return new FanoutExchange(exchangeName); 
  178.         } 
  179.         return null
  180.     } 
  181.  
  182.     /** 
  183.      * 聲明綁定關系 
  184.      * @param exchangeType 
  185.      * @param exchangeName 
  186.      * @param queueName 
  187.      * @param routingKey 
  188.      * @param isWhereAll 
  189.      * @param headers 
  190.      * @return 
  191.      */ 
  192.     private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  193.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  194.             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey); 
  195.         } 
  196.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  197.             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey); 
  198.         } 
  199.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  200.             if(isWhereAll){ 
  201.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match(); 
  202.             }else
  203.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match(); 
  204.             } 
  205.         } 
  206.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  207.             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName)); 
  208.         } 
  209.         return null
  210.     } 
  211.  
  212.     /** 
  213.      * 聲明隊列 
  214.      * @param queueName 
  215.      * @return 
  216.      */ 
  217.     private Queue createQueue(String queueName){ 
  218.         return new Queue(queueName); 
  219.     } 
  220.  
  221.  
  222.     /** 
  223.      * 交換器類型 
  224.      */ 
  225.     public final static class ExchangeType { 
  226.  
  227.         /** 
  228.          * 直連交換機(全文匹配) 
  229.          */ 
  230.         public final static String DIRECT = "DIRECT"
  231.  
  232.         /** 
  233.          * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個) 
  234.          */ 
  235.         public final static String TOPIC = "TOPIC"
  236.  
  237.         /** 
  238.          * 頭交換機(自定義鍵值對匹配,根據發送消息內容中的headers屬性進行匹配) 
  239.          */ 
  240.         public final static String HEADERS = "HEADERS"
  241.  
  242.         /** 
  243.          * 扇形(廣播)交換機 (將消息轉發到所有與該交互機綁定的隊列上) 
  244.          */ 
  245.         public final static String FANOUT = "FANOUT"
  246.     } 

此致, rabbitMQ 核心操作功能操作已經開發完畢!

2.5、編寫隊列監聽類(靜態)

  1. @Slf4j 
  2. @Configuration 
  3. public class DirectConsumeListener { 
  4.  
  5.     /** 
  6.      * 監聽指定隊列,名稱:mq.direct.1 
  7.      * @param message 
  8.      * @param channel 
  9.      * @throws IOException 
  10.      */ 
  11.     @RabbitListener(queues = "mq.direct.1"
  12.     public void consume(Message message, Channel channel) throws IOException { 
  13.         log.info("DirectConsumeListener,收到消息: {}", message.toString()); 
  14.     } 

如果你需要監聽指定的隊列,只需要方法上加上@RabbitListener(queues = "")即可,同時填寫對應的隊列名稱。

但是,如果你想動態監聽隊列,而不是通過寫死在方法上呢?

請看下面介紹!

2.6、編寫隊列監聽類(動態)

重新實例化一個SimpleMessageListenerContainer對象,這個對象就是監聽容器。

  1. @Slf4j 
  2. @Configuration 
  3. public class DynamicConsumeListener { 
  4.  
  5.     /** 
  6.      * 使用SimpleMessageListenerContainer實現動態監聽 
  7.      * @param connectionFactory 
  8.      * @return 
  9.      */ 
  10.     @Bean 
  11.     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ 
  12.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
  13.         container.setMessageListener((MessageListener) message -> { 
  14.             log.info("ConsumerMessageListen,收到消息: {}", message.toString()); 
  15.         }); 
  16.         return container; 
  17.     } 

如果想向SimpleMessageListenerContainer添加監聽隊列或者移除隊列,只需通過如下方式即可操作。

  1. @Slf4j 
  2. @RestController 
  3. @RequestMapping("/consumer"
  4. public class ConsumerController { 
  5.  
  6.     @Autowired 
  7.     private SimpleMessageListenerContainer container; 
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 添加隊列到監聽器 
  14.      * @param consumerInfo 
  15.      */ 
  16.     @PostMapping("addQueue"
  17.     public void addQueue(@RequestBody ConsumerInfo consumerInfo) { 
  18.         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName()); 
  19.         if(!existQueue){ 
  20.             throw new CommonExecption("當前隊列不存在"); 
  21.         } 
  22.         //消費mq消息的類 
  23.         container.addQueueNames(consumerInfo.getQueueName()); 
  24.         //打印監聽容器中正在監聽到隊列 
  25.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  26.     } 
  27.  
  28.     /** 
  29.      * 移除正在監聽的隊列 
  30.      * @param consumerInfo 
  31.      */ 
  32.     @PostMapping("removeQueue"
  33.     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) { 
  34.         //消費mq消息的類 
  35.         container.removeQueueNames(consumerInfo.getQueueName()); 
  36.         //打印監聽容器中正在監聽到隊列 
  37.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  38.     } 
  39.  
  40.     /** 
  41.      * 查詢監聽容器中正在監聽到隊列 
  42.      */ 
  43.     @PostMapping("queryListenerQueue"
  44.     public void queryListenerQueue() { 
  45.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  46.     } 

2.7、發送消息到交換器

發送消息到交換器,非常簡單,只需要通過如下方式即可!

  • 先編寫一個請求參數實體類
  1. @Data 
  2. public class ProduceInfo implements Serializable { 
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器名稱 
  8.      */ 
  9.     private String exchangeName; 
  10.  
  11.     /** 
  12.      * 路由鍵key 
  13.      */ 
  14.     private String routingKey; 
  15.  
  16.     /** 
  17.      * 消息內容 
  18.      */ 
  19.     public String msg; 
  • 編寫接口api
  1. @RestController 
  2. @RequestMapping("/produce"
  3. public class ProduceController { 
  4.  
  5.     @Autowired 
  6.     private RabbitUtil rabbitUtil; 
  7.  
  8.     /** 
  9.      * 發送消息到交換器 
  10.      * @param produceInfo 
  11.      */ 
  12.     @PostMapping("sendMessage"
  13.     public void sendMessage(@RequestBody ProduceInfo produceInfo) { 
  14.         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo); 
  15.     } 
  16.  

當然,你也可以直接使用rabbitTemplate操作類,來實現發送消息。

  1. rabbitTemplate.convertAndSend(exchange, routingKey, message); 

參數內容解釋:

  • exchange:表示交換器名稱
  • routingKey:表示路由鍵key
  • message:表示消息

2.8、交換器、隊列維護操作

如果想通過接口對 rabbitMQ 中的交換器、隊列以及綁定關系進行維護,通過如下方式接口操作,即可實現!

先編寫一個請求參數實體類

  1. @Data 
  2. public class QueueConfig implements Serializable
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器類型 
  8.      */ 
  9.     private String exchangeType; 
  10.  
  11.     /** 
  12.      * 交換器名稱 
  13.      */ 
  14.     private String exchangeName; 
  15.  
  16.     /** 
  17.      * 隊列名稱 
  18.      */ 
  19.     private String queueName; 
  20.  
  21.     /** 
  22.      * 路由鍵key 
  23.      */ 
  24.     private String routingKey; 

編寫接口api

  1. /** 
  2.  * rabbitMQ管理操作控制層 
  3.  */ 
  4. @RestController 
  5. @RequestMapping("/config"
  6. public class RabbitController { 
  7.  
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 創建交換器 
  14.      * @param config 
  15.      */ 
  16.     @PostMapping("addExchange"
  17.     public void addExchange(@RequestBody QueueConfig config) { 
  18.         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName()); 
  19.     } 
  20.  
  21.     /** 
  22.      * 刪除交換器 
  23.      * @param config 
  24.      */ 
  25.     @PostMapping("deleteExchange"
  26.     public void deleteExchange(@RequestBody QueueConfig config) { 
  27.         rabbitUtil.deleteExchange(config.getExchangeName()); 
  28.     } 
  29.  
  30.     /** 
  31.      * 添加隊列 
  32.      * @param config 
  33.      */ 
  34.     @PostMapping("addQueue"
  35.     public void addQueue(@RequestBody QueueConfig config) { 
  36.         rabbitUtil.addQueue(config.getQueueName()); 
  37.     } 
  38.  
  39.     /** 
  40.      * 刪除隊列 
  41.      * @param config 
  42.      */ 
  43.     @PostMapping("deleteQueue"
  44.     public void deleteQueue(@RequestBody QueueConfig config) { 
  45.         rabbitUtil.deleteQueue(config.getQueueName()); 
  46.     } 
  47.  
  48.     /** 
  49.      * 清空隊列數據 
  50.      * @param config 
  51.      */ 
  52.     @PostMapping("purgeQueue"
  53.     public void purgeQueue(@RequestBody QueueConfig config) { 
  54.         rabbitUtil.purgeQueue(config.getQueueName()); 
  55.     } 
  56.  
  57.     /** 
  58.      * 添加綁定 
  59.      * @param config 
  60.      */ 
  61.     @PostMapping("addBinding"
  62.     public void addBinding(@RequestBody QueueConfig config) { 
  63.         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  64.     } 
  65.  
  66.     /** 
  67.      * 解除綁定 
  68.      * @param config 
  69.      */ 
  70.     @PostMapping("removeBinding"
  71.     public void removeBinding(@RequestBody QueueConfig config) { 
  72.         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  73.     } 
  74.  
  75.     /** 
  76.      * 創建頭部類型的交換器 
  77.      * 判斷條件是所有的鍵值對都匹配成功才發送到隊列 
  78.      * @param config 
  79.      */ 
  80.     @PostMapping("andExchangeBindingQueueOfHeaderAll"
  81.     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) { 
  82.         HashMap<String, Object> header = new HashMap<>(); 
  83.         header.put("queue""queue"); 
  84.         header.put("bindType""whereAll"); 
  85.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nulltrue, header); 
  86.     } 
  87.  
  88.     /** 
  89.      * 創建頭部類型的交換器 
  90.      * 判斷條件是只要有一個鍵值對匹配成功就發送到隊列 
  91.      * @param config 
  92.      */ 
  93.     @PostMapping("andExchangeBindingQueueOfHeaderAny"
  94.     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) { 
  95.         HashMap<String, Object> header = new HashMap<>(); 
  96.         header.put("queue""queue"); 
  97.         header.put("bindType""whereAny"); 
  98.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nullfalse, header); 
  99.     } 

至此,rabbitMQ 管理器基本的 crud 全部開發完成!

三、利用 MQ 實現事務補償

當然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web 項目將其管理起來,最重要的是能投入業務使用中去!

上面的操作只是告訴我們怎么使用 rabbitMQ!

  • 當你仔細回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?

以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:支付訂單、扣減庫存、生成相應單據、發紅包、發短信通知等等。

在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

這種是利用 MQ 實現業務解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。

利用 MQ 實現業務解耦的過程其實也很簡單。

  • 當主流程結束之后,將消息推送到發紅包、發短信交換器中即可
  1. @Service 
  2. public class OrderService { 
  3.  
  4.     @Autowired 
  5.     private RabbitUtil rabbitUtil; 
  6.  
  7.     /** 
  8.      * 創建訂單 
  9.      * @param order 
  10.      */ 
  11.     @Transactional 
  12.     public void createOrder(Order order){ 
  13.         //1、創建訂單 
  14.         //2、調用庫存接口,減庫存 
  15.         //3、向客戶發放紅包 
  16.         rabbitUtil.convertAndSend("exchange.send.bonus"nullorder); 
  17.         //4、發短信通知 
  18.         rabbitUtil.convertAndSend("exchange.sms.message"nullorder); 
  19.     } 
  20.  
  • 監聽發紅包操作
  1. /** 
  2.  * 監聽發紅包 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.send.bonus"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調用發紅包接口 

監聽發短信操作

  1. /** 
  2.  * 監聽發短信 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.sms.message"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調用發短信接口 

既然 MQ 這么好用,那是不是完全可以將以前的業務也按照整個模型進行拆分呢?

答案顯然不是!

當引入 MQ 之后業務的確是解耦了,但是當 MQ 一旦掛了,所有的服務基本都掛了,是不是很可怕!

但是沒關系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發者,有坑填坑!

在下篇文章中,我們會詳細介紹 rabbitMQ 的集群搭建和部署,保證消息幾乎 100% 的投遞和消費。

四、總結

本篇主要圍繞SpringBoot整合rabbitMQ做內容介紹,可能也有理解不到位的地方,歡迎網友批評指出!

 

責任編輯:武曉燕 來源: Java極客技術
相關推薦

2023-04-26 12:46:43

DockerSpringKubernetes

2009-11-09 14:57:37

WCF上傳文件

2011-01-06 10:39:25

.NET程序打包

2021-07-14 09:00:00

JavaFX開發應用

2011-05-03 15:59:00

黑盒打印機

2011-01-10 14:41:26

2025-05-07 00:31:30

2011-04-21 10:32:44

MySQL雙機同步

2021-03-12 10:01:24

JavaScript 前端表單驗證

2020-05-15 08:07:33

JWT登錄單點

2025-08-26 01:32:00

2022-08-04 10:39:23

Jenkins集成CD

2022-12-07 08:42:35

2022-03-14 14:47:21

HarmonyOS操作系統鴻蒙

2022-07-27 08:16:22

搜索引擎Lucene

2022-01-08 20:04:20

攔截系統調用

2011-02-22 13:46:27

微軟SQL.NET

2021-12-28 08:38:26

Linux 中斷喚醒系統Linux 系統

2021-02-26 11:54:38

MyBatis 插件接口

2011-02-22 14:36:40

ASP.NETmsdnC#
點贊
收藏

51CTO技術棧公眾號

精品亚洲一区二区| 亚洲永久精品国产| 国产精品欧美激情| 老熟妇高潮一区二区三区| 成人污污视频| 亚洲成av人在线观看| 日韩av电影免费在线观看| 国产又黄又猛又爽| 国产亚洲高清视频| 日韩视频精品在线| 国产a级黄色片| jizz欧美| 午夜国产精品一区| 一区二区视频在线观看| 人妻中文字幕一区| 精品一区二区三区蜜桃| 欧美性做爰毛片| 无码黑人精品一区二区| 亚洲动漫在线观看| 日韩欧美一二区| 国产三级三级三级看三级| 亚洲七七久久综合桃花剧情介绍| 91视频在线看| 3d动漫啪啪精品一区二区免费| 无码人妻精品一区二区三区蜜桃91| 雨宫琴音一区二区三区| 亚洲免费精彩视频| 女性生殖扒开酷刑vk| 日韩一区二区三区免费视频| 午夜精品久久久久久久| 中文字幕一区二区三区最新| 黄色在线小视频| www.亚洲人| 成人国产一区二区| 国产又大又粗又硬| 久久狠狠亚洲综合| 国产精品电影网站| 久久久蜜桃一区二区| 日韩午夜在线| 午夜精品久久久久久久99热| 欧美日韩精品亚洲精品| 国产精品福利在线观看播放| 国产午夜一区二区| 蜜桃av免费看| 亚洲自拍都市欧美小说| 亚洲黄色成人网| 中文字幕一区二区三区乱码不卡| 国产一区一区| 日韩午夜激情视频| 欧美性猛交xxxx乱大交91| 黄页免费欧美| 欧美日韩国产色站一区二区三区| 亚洲无吗一区二区三区| 日韩免费小视频| 在线这里只有精品| 久久久精品麻豆| 成人午夜sm精品久久久久久久| 日本韩国欧美在线| 免费一级特黄录像| 国产精品天堂蜜av在线播放| 欧美日韩国产综合草草| 免费一区二区三区在线观看| 日韩一级特黄| 6080国产精品一区二区| 欧美日韩理论片| 亚洲经典视频| 亚洲精品suv精品一区二区| 超碰男人的天堂| 九九久久电影| 色妞欧美日韩在线| 欧美日韩黄色网| 禁久久精品乱码| 51视频国产精品一区二区| 亚洲欧美自拍视频| 欧美aaaaa成人免费观看视频| 国产精品影片在线观看| 国产精品伦理一区| 成人听书哪个软件好| 九九99玖玖| 成人三级黄色免费网站| 中文字幕制服丝袜一区二区三区| 国产在线拍揄自揄拍无码| 羞羞污视频在线观看| 午夜精品久久久久久久久久久| 日本三区在线观看| a一区二区三区亚洲| 精品国产一区二区三区忘忧草| 亚洲男人在线天堂| 成人羞羞视频播放网站| 米奇精品一区二区三区在线观看| 在线免费观看毛片| 免费在线观看不卡| 国产成人亚洲欧美| 国产黄在线看| 亚洲一线二线三线久久久| 亚洲色欲综合一区二区三区| 欧美黄页在线免费观看| 精品99一区二区三区| av女人的天堂| 亚洲一级二级| 国产精品视频导航| 免费激情视频网站| 国产精品嫩草影院av蜜臀| 免费高清一区二区三区| 日本综合视频| 精品日韩成人av| 亚洲一二三精品| 亚洲二区精品| 国产日产欧美a一级在线| 天天av天天翘| 亚洲三级免费观看| 精品视频无码一区二区三区| 一区二区日韩| 日日狠狠久久偷偷四色综合免费| 日本熟妇色xxxxx日本免费看| 免费成人在线观看视频| 精品一卡二卡三卡四卡日本乱码| 日韩在线免费电影| 色综合久久久久网| 日韩女优在线视频| 五月激情综合| 国产精品aaa| 亚州av在线播放| 亚洲综合免费观看高清完整版| 日本激情综合网| 在线日本制服中文欧美| 久久久久久久一区二区| 97人妻人人澡人人爽人人精品| 久久色在线视频| 亚洲 自拍 另类小说综合图区| 国产一区二区三区免费在线 | 高清国产mv在线观看| 亚洲国产激情av| 欧美成人黑人猛交| 神马久久影院| 91黑丝高跟在线| 狠狠综合久久av一区二区| 亚洲婷婷在线视频| 亚欧激情乱码久久久久久久久| 免费av一区| 日本精品视频在线观看| 偷拍自拍在线| 精品久久久久久久久中文字幕| 精品人妻二区中文字幕| 欧美成人首页| 99在线热播| 牛牛精品在线视频| 欧美mv日韩mv| 国产91av视频| 99精品一区二区三区| 成人性生活视频免费看| 国产精品午夜av| 91爱视频在线| 欧美高清电影在线| 色88888久久久久久影院野外| 久久国产精品影院| 久久久天天操| 亚洲高清123| 91麻豆精品| 久久国产天堂福利天堂| 亚洲精品综合久久| 香蕉乱码成人久久天堂爱免费| 偷偷色噜狠狠狠狠的777米奇| 99国产精品私拍| 免费久久99精品国产自| 久久野战av| 日韩在线一区二区三区免费视频| 波多野结衣一区二区三区四区| 国产日韩av一区| а 天堂 在线| 伊人成人网在线看| 女女同性女同一区二区三区91| 奇米777日韩| 日韩中文在线观看| 亚洲精品97久久中文字幕无码| 亚洲成人自拍网| 久久久久久久久久久国产精品| 日韩影院在线观看| av磁力番号网| 日本国产精品| 国产精品久久久久久久av大片 | 亚洲精品97久久中文字幕| 午夜精品福利久久久| 成人在线一级片| 国产自产高清不卡| 成人在线免费观看av| 成人在线免费观看视频| av噜噜色噜噜久久| 九九热线视频只有这里最精品| 按摩亚洲人久久| 熟妇人妻av无码一区二区三区| 在线免费一区三区| 欧美日韩免费做爰视频| 久久久影院官网| 中文字幕国产高清| 国产精品永久| 蜜臀av.com| 国产精品嫩模av在线| 亚洲iv一区二区三区| 手机看片久久| 欧美黄色性视频| 97超碰人人在线| 亚洲国产精品电影| 国产农村老头老太视频| 色综合久久久久| 久草视频在线资源站| 国产精品污www在线观看| 免费在线观看日韩av| 日本视频中文字幕一区二区三区| www.国产亚洲| 国模吧精品视频| 国产视频不卡| 粉嫩av国产一区二区三区| 青青精品视频播放| 国产99re66在线视频| 中文字幕日韩av| 日韩午夜影院| 亚洲成av人乱码色午夜| 国产精品欧美综合亚洲| 色av成人天堂桃色av| 日韩高清免费av| 亚洲欧美日韩成人高清在线一区| 国产精品密蕾丝袜| 91免费观看在线| 丰满熟女人妻一区二区三区| 激情综合色综合久久综合| 狠狠热免费视频| 久久aⅴ乱码一区二区三区| av日韩在线看| 欧美国产激情| 国产手机视频在线观看| 97色伦图片97综合影院| 午夜精品一区二区三区在线观看| 一区二区三区日本久久久| 极品尤物一区二区三区| 国产主播性色av福利精品一区| 亚洲字幕一区二区| 成人97精品毛片免费看| 国产日韩在线一区| 欧美韩国日本| 91精品国产综合久久香蕉最新版| a∨色狠狠一区二区三区| 国产99视频在线观看| 成人av三级| 欧美一级黑人aaaaaaa做受| 激情黄产视频在线免费观看| 性亚洲最疯狂xxxx高清| 免费毛片b在线观看| 国产69精品久久久久久| 性欧美18xxxhd| 欧美影院在线播放| 自由日本语热亚洲人| 日本老师69xxx| 欧美影视资讯| 国产精品一区二区久久国产| 黄页免费欧美| 91在线免费观看网站| 国产视频一区二| caoporen国产精品| 国产精品qvod| 欧美极品一区二区| 欧美理论视频| 视频一区二区视频| 欧美亚韩一区| 成人免费在线小视频| 日韩中文字幕91| 在线视频日韩欧美| 风间由美性色一区二区三区 | 国偷自产av一区二区三区| 国产亚洲情侣一区二区无| 一区二区美女| 特级毛片在线免费观看| 黑人一区二区| 黄色片视频在线播放| 蜜臀av一区二区| 免费黄色在线播放| 久久综合九色综合欧美亚洲| 免费在线观看a视频| 亚洲免费伊人电影| 国产精品美女久久久久av爽| 在线视频欧美精品| 国产成人久久精品77777综合 | 亚洲精品韩国| 91激情视频在线| 国产传媒欧美日韩成人| 波多野结衣 在线| 亚洲人吸女人奶水| 免费看日韩毛片| 欧美日韩精品一区二区三区| www.日韩高清| 亚洲人成电影网站色| 午夜羞羞小视频在线观看| 热久久免费视频精品| www.久久草.com| 欧美性天天影院| 伊人久久大香线蕉精品组织观看| 欧美激情1区2区| 国产精品日日摸夜夜添夜夜av| 欧美大片网站| 国产一区免费在线| 色一区二区三区四区| 久久久久久www| 久久精品av麻豆的观看方式| 91丝袜在线观看| 自拍偷拍国产精品| 天堂中文字幕在线观看| 日韩视频在线观看一区二区| 国产一二三区在线| 午夜精品在线观看| 成人免费91| 日产精品久久久一区二区| 亚洲激情婷婷| 女王人厕视频2ⅴk| 中文字幕不卡三区| 亚洲男人第一av| 日韩精品中午字幕| 日本中文字幕视频在线| 国产99在线|中文| xxxxxhd亚洲人hd| 97超碰免费观看| 日韩av高清在线观看| 菠萝菠萝蜜网站| 亚洲在线视频免费观看| 97成人在线观看| 中文字幕日韩高清| 蜜桃视频成人m3u8| 久久艹中文字幕| 日韩视频免费| 日本一级大毛片a一| 亚洲人成亚洲人成在线观看图片 | 欧美三级黄网| 欧美一区二三区| 青青视频一区二区| 国产在线播放观看| av在线不卡电影| 日韩成人高清视频| 日韩av一区二区在线| 啦啦啦中文在线观看日本| 亚洲综合在线中文字幕| 久久久久国产| 91精品国产三级| 亚洲男女毛片无遮挡| 国产又粗又长又黄| 美女视频久久黄| 日韩欧美中文字幕一区二区三区 | 中文亚洲字幕| 最近中文字幕无免费| 欧美色图在线视频| 天天干天天爱天天操| 97精品国产97久久久久久| 卡通动漫精品一区二区三区| 九色自拍视频在线观看| 99热这里都是精品| 日日噜噜噜噜人人爽亚洲精品| 亚洲另类图片色| 欧洲一级精品| 日韩欧美亚洲日产国| 免费人成网站在线观看欧美高清| 美国黄色特级片| 91精品免费观看| 九色91在线| 久久久水蜜桃| 青青草91视频| 成熟的女同志hd| 亚洲第一免费网站| 一区一区三区| 一区二区三区我不卡| 国产精品一二三在| 日韩视频免费观看高清| 亚洲欧美一区二区三区情侣bbw| 日产精品一区| 在线免费观看成人| 国产精品一级二级三级| 久久国产精品免费看| 伊是香蕉大人久久| 9999精品免费视频| 欧美在线一区视频| 国产欧美日本一区二区三区| 国产免费的av| 国语自产偷拍精品视频偷| 国产麻豆精品久久| 国产资源中文字幕| 精品欧美aⅴ在线网站| 91在线不卡| 国产欧美亚洲日本| 蜜桃av一区二区三区电影| 久草视频手机在线观看| 亚洲欧洲视频在线| 亚洲va欧美va人人爽成人影院| 中国丰满人妻videoshd| 国产精品免费视频一区| 秋霞av鲁丝片一区二区| 国产999精品久久久| 午夜久久美女| 亚洲第一页av| 日韩一级免费观看| 3d欧美精品动漫xxxx无尽| 少妇久久久久久被弄到高潮| 国产亚洲欧美日韩俺去了| 成人黄色免费视频|