备份交换机
# 160.备份交换机
为了防止宕机,我们可以使用发布确认模式,来确保消息不丢失;除此之外还可以加个备份交换机
# 概述
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。
但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。
如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。
通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
一句话,正常交换机无法处理的,就由备份交换机来处理。备胎交换机可以再发监控和告警等来处理
使用备机后的架构图:
# 声明交换机和队列
接下来我们来实现下。由于确认交换机和确认队列,我们之前都写过了,我们只需增加备份交换机和 2 个队列即可。
在 ConfirmConfig
中添加如下代码:
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup.queue";
//报警队列
public static final String WARNING_QUEUE_NAME = "warning.queue";
2
3
4
5
6
7
8
然后是声明队列:
// 备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 备份队列
@Bean("backupQueue")
public Queue backupQueue() {
return new Queue(BACKUP_QUEUE_NAME);
}
// 报警队列
@Bean("warningQueue")
public Queue warningQueue() {
return new Queue(WARNING_QUEUE_NAME);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后是绑定:
// 绑定
@Bean
public Binding bindingBackupQueue(
@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
// 绑定
@Bean
public Binding bindingWarningQueue(
@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 设置备份关系
接下来我们设置确认交换机和备份交换机。我们在创建确认交换机的时候,同时加个参数,指明备份交换机的名字即可:
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
// 是否持久化
.durable(true)
// 是否自动删除
.autoDelete()
// 备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
.build();
}
2
3
4
5
6
7
8
9
10
11
# 消费者
我们写一个 warning 消费者即可(因为是扇出类型的,有一个收到,另一个也能收到)
package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WarningComsumer {
@RabbitListener(queues = "warning.queue")
public void receiveWarning(Message msg) {
log.error("发现不可路由消息:{}", new String(msg.getBody()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 测试
在重启测试之前,我们需要先删掉确认交换机,因为我们修改了确认交换机的配置。
测试结果:能看到备份交换机有工作,并转发到了警告队列
交换机已经收到Id为: 1 的消息
发送时间: Wed May 31 21:38:08 CST 2023, 发送内容: 大家好1, routing: key2
接收时间: Wed May 31 21:38:08 CST 2023, 接收内容: 大家好1key1
发现不可路由消息:大家好1key12
交换机已经收到Id为: 2 的消息
2
3
4
5
# 注意事项
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。
# 源码
已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo4,读者可以通过切换分支来查看本文的示例代码