Federation
# 210.Federation
Federation 是联邦的意思,接下来我们讲讲 Federation Exchange 和 Federation Queue。
# 为什么要用 Federation Exchange
假设我们目前有 2 个 broker,一个在北京,一个在深圳(异地灾备),彼此之间相距甚远,网络延迟是一个不得不面对的问题。两个节点之间需要同步数据,将数据同步给另一个节点的我们称为上游,另一个则称为下游。
有一个在北京的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小,(Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的情况下,也可以迅速收到确认信息。
此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。
将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房。
使用 Federation 插件就可以很好地解决这个问题。
# 启用插件
首先需要保证每台节点单独运行。在每台机器上开启 federation 相关插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
2
然后我们登录后台,可以看到多了两个选项:
如果登录后没有看到,则可以试下重启 RabbitMQ。实在不行就重启 Linux,然后重新配置集群
# 原理
我们假设 node1 是上游,node2 是下游,上游的数据会同步给下游
既然是联邦交换机,那么就是上游的交换机,同步数据给下游的交换机,同步的过程就交给我们刚刚配置的联邦插件。配置过程:
- 在 node2 创建联邦交换机,队列和绑定
- 在 node2 上配置 node1 的地址(upstream)
- 添加 policy
# 创建交换机和队列
新写一个类,创建交换机、队列和绑定关系:
package com.peterjxl.rabbitmq.demo12Federation;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "mirror_hello";
public static final String EXCHANGE_NAME = "fed_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.103");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("node2_queue", true, false, false, null);
channel.queueBind("node2_queue", EXCHANGE_NAME, "routeKey");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
运行,此时后台可以看到有创建成功
# 配置 upstream
然后我们在 node2 后台配置:
(https://image.peterjxl.com/blog/image-20230603215117-4e51vyu.png)
然后配置上游:
# 添加 policy
然后配置规则:
然后点击 Add / update policy
然后我们可以查看下联邦的状态,此时可以看到是正常的:
# Federation Queue
之前我们是按交换机来同步数据的,其实也可以通过队列来同步。联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。
一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求,原理图:
我们将 node1 结点的数据同步给 node2,因此先将 node2 节点的队列联邦到 node1 上,这样 2 个消费者可以消费相同的数据(消费者 1 可能在北京,而消费者 2 可能在深圳)。
配置步骤和我们之前配置联邦交换机类似:
- 创建交换机、队列
- 配置 upstream(之前我们已经配过了)
- 添加 policy