Topics 交换机
# 110.Topics 交换机
直接交换机只能路由一个队列;扇出交换机只能广播所有的,如果我们想要不想要广播所有,但也不是只要一个队列,此时就能用主题交换机。
主题交换机是功能最强大的,使用也最广的
# 之前类型的问题
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性:比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型
# Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的:
*
(星号) 可以代替一个单词#
(井号)可以替代零个或多个单词
# Topic 匹配案例
假设目前有这样一个交换机和队列:
绑定关系如下
Q1--> 绑定的是中间带 orange 带 3 个单词的字符串(*.orange.*
)
Q2--> 绑定的是最后一个单词是 rabbit 的 3 个单词(*.*.rabbit
),第一个单词是 lazy 的多个单词(lazy.#
)
我们来看看他们之间数据接收情况是怎么样的:
quick.orange.rabbit
:被队列 Q1,Q2 接收到
lazy.orange.elephant
:被队列 Q1,Q2 接收到
quick.orange.fox
:被队列 Q1 接收到
lazy.brown.fox
:被队列 Q2 接收到
lazy.pink.rabbit
:虽然满足两个绑定,但只被队列 Q2 接收一次
quick.brown.fox
:不匹配任何绑定。不会被任何队列接收到。会被丢弃
quick.orange.male.rabbit
:是四个单词。不匹配任何绑定。会被丢弃
lazy.orange.male.rabbit
:是四个单词但匹配 Q2
当队列绑定关系是下列这种情况时,需要注意:
- 当一个队列绑定键是
#
,那么这个队列将接收所有数据,就有点像 fanout 了 - 如果队列绑定键当中没有
#
和*
出现,那么该队列绑定类型就是 direct 了
下面我们来实践下
# 消费者 Q1
- 声明一个主题交换机
- 声明一个队列 Q1
- 绑定 routing key
- 消费消息,并打印消息所传递的键
package com.peterjxl.rabbitmq.demo8Topic;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare("Q1", false, false, false, null);
channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");
channel.basicConsume("Q1", true, (consumerTag, message) -> {
System.out.println("接受队列:Q1 "
+ "绑定键: " + message.getEnvelope().getExchange()
+ " 收到消息: " + new String(message.getBody()));
}, consumerTag -> {}
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 消费者 Q2
Q2 的代码和 Q1 很类似,除了队列名和 routing key 不同:
package com.peterjxl.rabbitmq.demo8Topic;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare("Q2", false, false, false, null);
channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");
channel.basicConsume("Q2", true, (consumerTag, message) -> {
System.out.println("接受队列:Q2 "
+ "绑定键: " + message.getEnvelope().getExchange()
+ " 收到消息: " + new String(message.getBody()));
}, consumerTag -> {}
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 生产者
写一个生产者,发送很多个信息:
# 测试
先启动两个消费者,再启动生产者,可以看到收到的消息,和我们之前讨论过的一样:
# 源码
已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo8,读者可以通过切换分支来查看本文的示例代码