HelloWorld 程序
# 50.HelloWorld 程序
安装好 RabbitMQ 后,下一步就是使用了
我们将用 Java 编写两个程序:
- 发送单个消息的生产者
- 接收消息并打印出来的消费者
并介绍 Java API 中的一些细节。
# 需求
我们使用生产者,发消息给 MQ
然后消费者从队列中取出消息
# 准备环境
我们新建一个 Maven 工程,并引入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
1
2
3
4
5
2
3
4
5
引入日志框架:
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 生产者
我们先声明一个队列的名字,将来后期用该队列存储信息:
package com.peterjxl.rabbitmq.demo;
public class Producer {
public static final String QUEUE_NAME = "hello";
}
1
2
3
4
2
3
4
然后我们写一个 main 方法来发送信息。有点类似 Mybatis,我们不直接创建连接,而是用工厂模式:
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
1
2
3
4
5
6
2
3
4
5
6
然后我们创建一个连接:
Connection connection = factory.newConnection();
1
之前我们说过,一个 Connection 是客户端和 MQ 的 TCP 连接,为了避免频繁创建,我们是使用信道的。这里我们使用 connection 对象创建信道:
Channel channel = connection.createChannel();
1
下一步应该就是配置交换机和队列了,但为了方便学习,我们先不使用交换机,而是直接创建一个队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
1
参数说明:
- 第一个参数:队列名
- 第二个参数:是否持久化,默认 false,表示保存在内存中(不持久化)
- 第三个参数:是否独占队列,默认 false,表示不独占队列(消息共享),true 则表示只供一个消费者消费
- 第四个参数:最后一个消费者断开连接后,是否自动删除队列,默认 false,表示不自动删除
- 第五个参数:队列的其他参数,如:存活时间
下一步就是发送消息了:
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
1
2
3
2
3
参数说明:
- 第一个参数:交换机名称,如果没有则指定空字符串(表示使用默认的交换机)
- 第二个参数:路由 key,简单模式中可以使用队列名称
- 第三个参数:消息其他属性
- 第四个参数:消息内容
然后我们运行该方法,然后可以在 RabbitMQ 的可视化界面中看到有消息了:
我们点击 hello,可以看到该队列的详情:ready 的意思是有一个消息,已经准备好被消费了
完整代码:
package com.peterjxl.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 消费者
同样的,我们也是创建 connection 和 channel,然后获取消息,完整代码:
package com.peterjxl.rabbitmq.demo;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
关键方法:
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
1
参数说明:
- 第一个参数:队列名称
- 第二个参数:是否自动确认,设置为 true 表示消息接收到自动向 MQ 回复接收到了,MQ 则会将消息从队列中删除;设置为 false 则需要手动确认
- 第三个参数:消费者未成功消费的回调函数,可以用 Lambda
- 第四个参数:消费者中断消费的回调函数,可以用 Lambda
为此,在调用该方法之前我们定义了 2 个回调函数:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息被中断");
};
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
运行结果:
接收到消息:Hello World
1
同时在控制台也看到消息被清零了:
# 源码
本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo1,读者可以通过切换分支来查看本文的示例代码
上次更新: 2024/10/3 10:01:52