其他知识点
# 170.其他知识点
讲讲 RabbitMQ 的其他的小知识
# 幂等性
# 概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
# 消息重复消费
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
# 解决思路
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
# 消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
- 唯一 ID+指纹码机制, 利用数据库主键去重
- 利用 redis 的原子性去实现
# 唯一 ID+指纹码机制
指纹码: 我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中
优势就是实现简单就一个拼接,然后查询判断是否重复;
劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
# Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费,建议用这个
# 优先级队列
# 使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能;
但是,商家是分大客户和小客户的,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以我们可以优先处理他们的订单,而曾经我们的后端系统是使用 Redis 来存放的定时轮询,大家都知道 Redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景
所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
原理:每个消息带有一个优先级(一个数字),优先级越大,越先被处理。一般来说,数字范围是 0~255
# 如何添加
有好几种方式声明优先级队列:
a.控制台页面添加
我们可以点击 Maximum priority,就回自动帮我们填写参数 x-max-priority,然后我们可以设置优先级最大的数字为 10
一般来说,优先级不会设置的很大,因为是要排序的,太大会影响性能,可以设置为 10。
队列中代码添加优先级:
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
2
3
然后我们就可以在消息中声明优先级了:
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意事项
要让队列实现优先级需要做的:队列需要设置为优先级队列,消息需要设置消息的优先级。
消费者需要等待消息已经发送到队列后才能去消费,因为这样才有机会对消息进行排序
# 测试
为了方便,我们就继续使用 SpringBoot 工程,而是我们第一个案例的工程来实现。
新建一个生产者类:
package com.peterjxl.rabbitmq.demo10Priority;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者:发送消息
*/
public class Producer {
// 队列名称
public static final String QUEUE_NAME = "priority_queue";
// 发送消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 10); // 设置队列的最大优先级为10, 数字越大优先级越高, 不要设置太大, 会影响性能
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
for (int i = 1; i < 11; i++) {
String message = "Hello World" + i;
if (i == 5) { //设置第五条消息的优先级为5
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
System.out.println("发送消息完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
然后我们运行该 main 方法,可以看到有一个新的队列了,并且有个 pri 的字眼,表明是优先级队列
新建消费者:
package com.peterjxl.rabbitmq.demo10Priority;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.*;
/**
* 消费者:接收消息
*/
public class Consumer {
// 队列名称
public static final String QUEUE_NAME = "priority_queue";
// 接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 消费者接收消息
// 消息接收失败后执行的方法
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
// 消息接收成功后执行的方法
@Override
public void handle(String consumerTag, Delivery message) {
System.out.println("接收到消息:" + new String(message.getBody()));
}
}, consumerTag -> System.out.println("接收消息失败"));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
运行该消费者,结果可以看到优先处理 5 的消息:
接收到消息:Hello World5
接收到消息:Hello World1
接收到消息:Hello World2
接收到消息:Hello World3
接收到消息:Hello World4
接收到消息:Hello World6
接收到消息:Hello World7
接收到消息:Hello World8
接收到消息:Hello World9
接收到消息:Hello World10
2
3
4
5
6
7
8
9
10
# 惰性队列
# 使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。
当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。
当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候
# 两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。
示例:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
2
3
# 内存开销对比
官网 (opens new window) 提供的数据:
可以看到惰性队列占用的内存是很小的
# 源码
本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo10,读者可以通过切换分支来查看本文的示例代码