镜像队列
# 190.镜像队列
即使在主节点创建了一个队列,该队列也不会同步在其余节点中报错;一旦主节点挂了,那么消息就丢失了。为此可以用镜像队列
# 如果主节点宕机
如果只有主节点有队列,那么宕机后就会丢失信息。虽然可以将所有消息都设置为持久化,并且对应队列的 durable 属性也设置为 true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后,在被写入磁盘之前有一个短暂的间隔,这时候宕机也会丢失。
虽然通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
我们目前创建一个队列:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行,然后可以在后台看到:
然后我们在 node1 上关闭服务:
rabbitmqctl stop_app
此时我们后台是连不上了:
但我们可以连 node2,然后可以看到 node1 是宕机的:
并且队列也是宕机状态,有多少个消息(ready、unacked 和 total 都不显示了)。
此时我们获取消息也是不行的,即使连的是其他节点:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.103");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
}, consumerTag -> {
System.out.println("接收消息被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行结果:队列 hello 已经被停止了(stopped),因为消息是在磁盘上
queue 'hello' in vhost '/' process is stopped by supervisor, class-id=60, method-id=20)
我们重启下 node1:
rabbitmqctl start_app
过了一阵子,就可以看到状态正常了:
但是消息已经丢失了
# 创建协议
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
当然,镜像队列也有缺点,那就是重复了,比如有 100w 个消息,那么其他节点也会存,有点浪费空间。
随便找一个节点添加 policy
然后我们开始填写协议的信息:
ha-params 是指 2 份(主机+备机,一共 2 份),ha-sync-mode 同步模式(自动)
# 创建队列
在 node1 上创建一个队列发送一条消息,集群会帮我们创建一个镜像队列(可能在 node2 或 node3)
我们创建一个新的队列
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "mirrior_hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
运行,此时可以看到队列有个“+1”的字眼
点进去可以看到详情:镜像队列在 node2 上。
# 停止 node1
我们停止 node1:
[root@node1 ~]# rabbitmqctl stop_app
此时 node1 宕机:
然后此时 node3 也会有个镜像队列:
然后我们获取消息:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "mirrior_hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.103");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
}, consumerTag -> {
System.out.println("接收消息被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行结果:正常获取到消息
接收到消息:Hello World
# 源码
本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo11,读者可以通过切换分支来查看本文的示例代码