
環境:SpringBoot2.7.9
消息丟失場景
- 生產者丟失消息
生產者發出的數據由于網絡原因沒有到底MQ Server丟失 - MQ Server丟消息
由于消息隊列沒有持久化或者是消息沒有持久化,在Server重啟后消息丟失 - 消費者丟消息
接收到消息后,業務還沒有處理完成,服務宕機(當你是自動ACK)。
生產者丟失解決方案
- 通過事務(不推薦)
- 確認機制(推薦)
這里只講如何通過確認機制保證生產者不丟失消息
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false) ;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#") ;
}
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
template:
mandatory: true
注意:spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。
接下來是為RabbitTemplate配置對應的Callback,Publisher確認回調,Publisher返回回調。
- 確認回調
當消息發送到了交換機則ack=true,當消息無法發送到交換機則ack=false。 - 返回回調
當消息能夠發送到交換機,但是不能路由到隊列則會調用該return回調。
RabbitTemplate是單例的可以通過兩種方式配置對應的回調。
- 自定義RabbitTemplate。
- 通過AWare接口獲取RabbitTemplate配置。
這里只講通過AWare接口配置回調。
@Component
public class ConfigRabbitTemplate implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class) ;
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlation: " + correlationData) ;
if (ack) {
System.out.println("消息發送到交換機") ;
} else {
System.out.println("消息發送失敗 - " + ", cause" + cause) ;
}
}
});
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString()) ;
}
});
}
}
使用錯誤的交換機和錯誤的路由key分別測試即可以看到上面的輸出信息了。
MQ Server丟消息
在通過@Bean聲明交換機和隊列時設置持久性,在消息上設置持久化。
@Bean
public TopicExchange topicExchange() {
// 這里的第二個參數就是設置是否持久化,如果設置為false,當服務重啟交換機將丟失
// 第三個參數是否自動刪除,當不再使用該交換機時會自動刪除該交換機
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
// 第二個參數true設置隊列是持久化的,當服務重啟隊列不會丟失
return new Queue("akf.queue", true, false, false) ;
}
設置消息持久化。
Message message = MessageBuilder.withBody("Hello".getBytes())
// 設置消息投遞模式為持久化的(默認不設置就是持久化的)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build() ;消費者丟消息
關閉自動應答機制。
默認是自動應答,當消息監聽方法中沒有異常時則正常應答,當發生異常時,在默認情況下會重新入隊列(這樣就會出現死循環)。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: manual #設置為手動應答
消息監聽。
@RabbitListener(queues = {"akf.queue"})
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("接收到消息: " + new String(message.getBody()));
// ... 這里處理我們的業務代碼
// 當消費者把消息消費成功,再手動應答RabbitMQ
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果發生了異常,我們一般的處理是直接扔掉死信隊列,一般這里出現錯誤都是消息有問題
// 如果消息出現問題,你重試再入隊列是無意義的
}
}消息重試
如果消息消費時出現錯誤,你又希望能夠通過重試來盡可能的處理掉該消息,Spring也提供了相應的重試機制。
修改配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: auto
concurrency: 1
retry:
# 開啟重試
enabled: true
# 延遲1s后開始重試
initialInterval: 1000
# 每次消息重試的間隔乘數
multiplier: 3
# 2次間的重試最大間隔時間
maxInterval: 20000
maxAttempts: 4 #重試4次,1s, 3s, 9s
stateless: true #如果消息處理中存在事務則需要將其設置為false
如果只是做上面的配置,重試指定次數后消息將會被丟棄,這是默認行為。Spring提供了 MessageRecoverer接口來決定消息如何處理。默認Spring提供如下幾種實現:
- ImmediateRequeueMessageRecoverer
- RejectAndDontRequeueRecoverer
- RepublishMessageRecoverer
我們只需要定義一個Bean為MessageRecoverer即可,這里我們就用Spring提供的RepublishMessageRecoverer重新發布消息。
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error") ;
}
這里將消息重新發布一個專門的隊列(重試指定次數后)。