延迟队列
# 130.延迟队列
之前讲安装 RabbitMQ 可视化插件的时候讲过
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
# 使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。
但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下
其实延迟队列是死信队列的一种,当消息一直没被处理,达到了 TTL 后,就会被放到死信队列中。
例如这是用户下单后,要在 30 分钟内完成付款的流程图:
# 整合 SpringBoot
为了方便演示,我们先整合下 SpringBoot。在 IDEA 中新建一个:
这里我们不使用 SpringBoot3
为了不让版本造成影响,我们改为使用低版本的 SpringBoot
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.11.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
2
3
4
5
6
# 添加依赖
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.67_noneautotype2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 修改配置文件
我们修改 src/main/resources/application.properties,添加如下内容:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
2
3
4
# 配置 Swagger
Swagger 是一个文档框架,这里我们只需会用就行,新建一个 config 包并新增代码:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig
{@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("peterjxl", "https:www.peterjxl.com","perterjxl@qq.com"))
.build();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
如果使用高版本的 SpringBoot 话,启动时可能会遇到报错:
Failed to start bean ‘documentationPluginsBootstrapper‘; nested exception is java.lang.NullPointerEx
# 需求
在继续演示之前,我们说下需求。
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y(用作延迟交换机),它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
也就是为 QA 和 QB 都绑定一个死信交换机 Y。
# 配置文件类
之前我们整合 SpringBoot 之前,都是在消费者中声明队列和交换机的;但有了 SpringBoot,我们就可以通过配置文件来声明队列和交换机。消费者和生产者各司其职,不用在消费者或生产者中声明了,使得代码更加简单明了
首先新建一个 Spring 的配置文件类:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLQueueConfig {
}
2
3
4
5
6
7
然后我们定义各个交换机和队列的名称:
//普通交换机
private static final String X_EXCHANGE = "X";
//死信交换机
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
//死信队列
private static final String DEAD_LETTER_QUEUE = "QD";
2
3
4
5
6
7
8
9
10
11
12
然后我们声明两个交换机:
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
2
3
4
5
6
7
8
9
10
11
声明 QA、QB 和死信队列 QD
// 声明普通队列A ttl为10s
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
// 统一设置队列中的所有消息的过期时间,单位毫秒
args.put("x-message-ttl", 10000);
// 统一设置队列的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 统一设置队列的死信routingKey
args.put("x-dead-letter-routing-key", "YD");
return (Queue) QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明普通队列A ttl为40s
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
// 统一设置队列中的所有消息的过期时间,单位毫秒
args.put("x-message-ttl", 40000);
// 统一设置队列的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 统一设置队列的死信routingKey
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然后是声明交换机和队列的绑定关系:
// 声明QA绑定关系
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明QB绑定关系
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明QD绑定关系
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 生产者
我们使用发送 HTTP 请求的方式,来发送消息。当访问指定的链接,就发送指定的消息。
新建一个 Controller 类:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
}
2
3
4
5
6
7
8
9
10
11
12
Slf4j 注解是日志的注解
然后我们发送消息:
@Autowired
private RabbitTemplate rabbitTemplate;
// 开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{}, 发送一条信息给两个TTL队列:{}" , new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
2
3
4
5
6
7
8
9
10
首先我们使用 RabbitMQ 提供的模板对象来发送,因此定义了一个 RabbitTemplate 对象。
然后 log.info,记录了日志;然后使用 convertAndSend 方法发送消息(第一个参数是交换机的名字,第二个是 routing key,第三个是消息)
# 消费者
package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 测试
我们访问 localhost:8080/ttl/sendMsg/嘻嘻嘻
,此时页面是空白的(这很正常,我们没写返回值),然后我们可以看到控制台有输出
当前时间:Tue May 30 07:15:47 CST 2023, 发送一条信息给两个TTL队列:嘻嘻嘻
当前时间:Tue May 30 07:15:57 CST 2023,收到死信队列的消息:消息来自ttl为10s的队列:嘻嘻嘻
当前时间:Tue May 30 07:16:27 CST 2023,收到死信队列的消息:消息来自ttl为40s的队列:嘻嘻嘻
2
3
4
从时间来看,确实是发了消息后 10 秒,队列 QA 消费消息;而 QB 也是 40 秒后消费消息
# 优化
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
解决方法:由生产者发送消息的时候指定 TTL。
我们新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间:
代码:在 TTLQueueConfig
中添加如下代码
private static final String QUEUE_C = "QC";
// 声明普通队列C
@Bean("queueC")
public Queue queueC(){
Map<String, Object> args = new HashMap<>(3);
// 统一设置队列的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 统一设置队列的死信routingKey
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
// 声明QC绑定关系
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
在 SendMsgController
中添加:
// 开始发消息, 带过期时间
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
log.info("当前时间:{}, 发送一条时长{}毫秒TTL信息给队列QC:{}" , new Date(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为" + ttlTime + "毫秒的队列:" + message, msg -> {
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
2
3
4
5
6
7
8
9
10
测试:我们重启,然后分别访问如下两个链接
http://localhost: 8080/ttl/sendExpirationMsg/你好 1/20000,这是 20 秒
http://localhost: 8080/ttl/sendExpirationMsg/你好 2/2000,这是 2 秒
结果:
当前时间:Tue May 30 07:54:49 CST 2023, 发送一条时长20000毫秒TTL信息给队列QC:你好1
当前时间:Tue May 30 07:54:54 CST 2023, 发送一条时长2000毫秒TTL信息给队列QC:你好2
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为20000毫秒的队列:你好1
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为2000毫秒的队列:你好2
2
3
4
可以看到消息 1,确实是 20 秒后才被消费;但为什么消息 2,明明应该是 2 秒的,也是 20 秒后被消费呢?
我们之前就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
# 源码
已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo1,读者可以通过切换分支来查看本文的示例代码