搜索 K
Appearance
博客正在加载中...
Appearance
死信,顾名思义就是无法被消费的消息。
一般来说,producer 发送消息,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
当放到死信队列后,后续还有机会能取出来消费
basic.reject 或 basic.nack) 并且 requeue=false. 下面我们来实践下。需求如下
我们先声明几个交换机和队列的名字,然后写一个消费消息的代码:
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class Consumer01 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer01接收到消息:" + new String(message.getBody()));
}, consumerTag -> {});
}
} 然后我们声明交换机:
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); 声明队列:注意死信队列和我们之前的定义方式不一样,因为要正常队列中的信息,通过转换后才能转发给死信队列,需要设置一些参数。
// 要传入队列配置对象
Map<String, Object> arguments = new HashMap<>();
// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信routing-key
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null); 其实我们还可以设置队列的过期时间:
// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);不仅仅是队列可以设置过期时间 TTL,生产者也可以设置 TTL,例如可以每次都不同的 TTL;而队列不能修改 TTL,所以通常是生产者设置 TTL
绑定交换机和队列:
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); 此时我们发送消息的时候,就需要指定参数了。先设置下:
// 设置消息的TTL时间, 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); 发送消息的时候指定:
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());完整代码:我们发送十个信息:
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
public class ProducerDemo9 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 设置消息的TTL时间, 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 发送死信消息,设置TTL
channel.basicPublish(NORMAL_EXCHANGE, "lisi", null, "这是一条消息".getBytes());
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
}
}
}我们先启动下消费者,创建好交换机和队列,然后停止;
停止消费者后,可以看到后台有这两个队列:

可以看到有 DLX 和 DLK:
DLX:x-dead-letter-exchange 说明该队列配置了死信交换机,
DLK:全称 x-dead-letter-routing-key,也就是有设置死信消息的 routing key 也可以看到有交换机:
正常队列和死信队列的绑定状态也正常:
然后启动生产者,由于没有消费者,因此过了 TTL 后,就会有死信。我们看到的现象应该是这样:
PS:由于我们 10 个消息都是很快发送完的,所以可能看不到正常队列逐个减少、死信队列逐个增多的情况
消费者 2 就是消费死信队列的消息即可,非常简单
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class Consumer02 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer02接收到消息:" + new String(message.getBody()));
}, consumerTag -> {});
}
} 运行结果:
Consumer02接收到消息:info1
Consumer02接收到消息:info2
Consumer02接收到消息:info3
Consumer02接收到消息:info4
Consumer02接收到消息:info5
Consumer02接收到消息:info6
Consumer02接收到消息:info7
Consumer02接收到消息:info8
Consumer02接收到消息:info9
Consumer02接收到消息:info10消费者 1 的代码最复杂,定义了交换机、队列和绑定关系等,而生产者只需发送消息,消费者 2 只需消费死信队列的消息。
之前我们仅仅演示了 TTL 的情况导致死信,接下来我们演示其他两种情况,首先是队列达到最大长度
我们在 Consumer01 里添加如下配置:
// 设置正常队列的长度限制
arguments.put("x-max-length", 6); 注意:由于我们修改了队列的参数,因此得先删除,然后再重新运行。此时能看到有个 Lim 的字眼,这是 Limit 的缩写。

然后我们开始发送消息,这里我们就不设置 TTL 了,修改 ProducerDemo9 的发送消息代码,改为传入 null:
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); 然后我们先停止消费者,然后启动生产者,这样消息就会积压在正常队列中,然后剩下 4 个消息就会被转发到死信队列:

还剩下一个场景:消费者拒绝消息,此时信息也会变成死信
为了不让之前的消息影响到接下来的实验,我们先启动消费者 1 和 2,处理掉队列中的信息,处理完后队列是空的:
然后我们将限制队列长度的代码注释掉,然后我们在后台删掉这个队列。 假设我们要拒绝 info5 这个消息:
channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
String msg = new String(message.getBody());
if(msg.equals("info5")){
System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接收到消息:" + msg);
}
}, consumerTag -> {}); 然后我们修改为开启手动应答(不批量):
channel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> {
String msg = new String(message.getBody());
if(msg.equals("info5")){
System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接收到消息:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {}); 此时我们启动消费者 1,然后再启动生产者:能看到死信队列有一个消息

我们点进去 dead_quque,然后可以获取信息:确实是 info5 被拒绝后放到死信队列了

消费者 1 的输出:
Consumer01接收到消息:info1
Consumer01接收到消息:info2
Consumer01接收到消息:info3
Consumer01接收到消息:info4
Consumer01接收到消息:info5,此消息被拒绝
Consumer01接收到消息:info6
Consumer01接收到消息:info7
Consumer01接收到消息:info8
Consumer01接收到消息:info9
Consumer01接收到消息:info10启动消费者 2 的输出:
Consumer02接收到消息:info5
已将源码上传到 Gitee 或 GitHub 上。并且创建了分支 demo9,读者可以通过切换分支来查看本文的示例代码