RabbitMQ 简介

RabbitMQ 官网

RabbitMQ 是由Erlang语言开发,基于 AMQP 协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006 年,AMQP 规范发布。类比 HTTP。

RabbitMQ 提供了 6 种模式:

  • 简单模式
  • work 模式
  • Publish/Subscribe 发布与订阅模式
  • Routing 路由模式
  • Topics 主题模式
  • RPC 远程调用模式(远程调用,不太算 MQ)

官网对应模式介绍:官方文档

RabbitMQ 基础架构

RabbitMQ 相关概念

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

AMQP

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP 是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

RabbitMQ 是 AMQP 协议的 Erlang 的实现。

概念说明
连接 Connection一个网络连接,比如 TCP/IP 套接字连接。
会话 Session端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
信道 Channel多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端 ClientAMQP 连接或者会话的发起者。AMQP 是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务节点 Broker消息中间件的服务节点;一般情况下可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
端点AMQP 对话的任意一方。一个 AMQP 连接包括两个端点(一个是客户端,一个是服务器)。
消费者 Consumer一个从消息队列里请求消息的客户端程序。
生产者 Producer一个向交换机发布消息的客户端应用程序。

一、Simple 简单模式

1、创建工程

创建项目:rabbitmq-producer

创建项目:rabbitmq-consumer

2、添加依赖

往两个项目的 pom.xml 文件中添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

3、编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.atguigu.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author: ShiGuang
* @create: 2021-06-07 20:19
* @description:
*/
public class Producer {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/");
//连接用户名;默认为guest
connectionFactory.setUsername("guest");
//连接密码;默认为guest
connectionFactory.setPassword("guest");

//创建连接
Connection connection = connectionFactory.newConnection();

// 创建频道
Channel channel = connection.createChannel();

// 声明(创建)队列
/**
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当mq重启之后,还在
* exclusive 参数3:是否独占本次连接
* ① 是否独占,只能有一个消费者监听这个队列
* ② 当connection关闭时,是否删除队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除
* arguments 参数5:队列其它参数
*/

channel.queueDeclare("simple_queue", true, false, false, null);

// 要发送的信息
String message = "你好;RabbitMQ!";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);

// 关闭资源
channel.close();
connection.close();
}
}

运行 main 程序,浏览器访问 http://127.0.0.1:15672

登录 rabbitMQ 的管理控制台,可以发现队列

点击 simple_queue 进入后,可以获取发送的消息

4、编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.atguigu.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author: ShiGuang
* @create: 2021-06-07 20:34
* @description:
*/
public class Consumer {
public static void main(String[] args) throws Exception {

//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。

*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("simple_queue", true, false, false, null);


// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调方法,当收到消息后,会自动执行该方法

1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据

*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认 ,类似咱们发短信,发送成功会收到一个确认消息
3. callback:回调对象

*/
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue", true, consumer);
}
}

运行程序,之前生产者发送的消息全部接收到了!

5、总结

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

6、运转流程

① 生产者发送消息

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker;
  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息至 RabbitMQ Broker;
  5. 关闭信道;
  6. 关闭连接;

② 消费者接收消息

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker
  2. 向 Broker 请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待 Broker 回应闭关投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ 从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接;

二、Work queues 工作队列模式

1、模式说明

Work Queues简单模式相比,多了一个或多个消费端,多个消费端共同消费同一个队列中的消息。

2、实现

复制多个简单模式中的消费者进行同时消费消息的测试

先启动两个消费者,然后再启动生产者发送消息;到 IDEA 的两个消费者对应的控制台查看是否竞争性的接收到消息

生产者这里可以修改一下实现批量发送消息,取代之前的单一发送

1
2
3
4
5
for (int i = 1; i <= 10; i++) {
String body = "这是发送出去的第" + i + "条消息";
// 发送消息
channel.basicPublish("", "simple_queue", null, body.getBytes());
}

3、总结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

三、Publish/Subscribe 发布与订阅模式

1、模式说明

前面 2 个模式中,只有 3 个角色:

  1. P:生产者,也就是要发送消息的程序
  2. C:消费者:消息的接受者,会一直等待消息到来。
  3. queue:消息队列,图中红色部分

而在订阅模型中,多了一个 exchange 角色,而且过程略有变化:

  1. P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)
  2. C:消费者,消息的接受者,会一直等待消息到来。
  3. Queue:消息队列,接收消息、缓存消息。
  4. Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定 routing key 的队列
  • Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

2、实现

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

① 编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.atguigu.rabbitmq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author: ShiGuang
* @create: 2021-06-08 16:43
* @description:
*/
public class Producer {
public static void main(String[] args) throws Exception {

//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//2. 设置参数
// 设置ip 默认值 localhost
factory.setHost("localhost");
// 设置端口 默认值 5672
factory.setPort(5672);
// 设置虚拟机 默认值/
factory.setVirtualHost("/");
// 设置用户名 默认值 guest
factory.setUsername("guest");
// 设置密码 默认值 guest
factory.setPassword("guest");

//3. 创建连接 Connection
Connection connection = factory.newConnection();

//4. 创建频道
Channel channel = connection.createChannel();


/*

exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配

3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数

*/

String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");

String body = "广播消息:今天是2021年6月8日";
//8. 发送消息
channel.basicPublish(exchangeName, "", null, body.getBytes());


//9. 释放资源
channel.close();
connection.close();
}
}

运行生产者查看

交换机页面:

② 编写多个消费者

消费者 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.atguigu.rabbitmq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author: ShiGuang
* @create: 2021-06-08 18:27
* @description:
*/
public class Consumer1 {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}

消费者 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.atguigu.rabbitmq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author: ShiGuang
* @create: 2021-06-08 18:32
* @description:
*/
public class Consumer2 {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";

// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
/*
回调方法,当收到消息后,会自动执行该方法

1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据

*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("body:" + new String(body));

}
};

/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象

*/
channel.basicConsume(queue2Name, true, consumer);
}
}

3、测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

在执行完测试代码后,其实到 RabbitMQ 的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

4、总结

交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的**生产方是面向队列发送消息(底层使用默认交换机)**。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。

四、Routing 路由模式

1、模式说明

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

2、实现

生产者在代码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.atguigu.rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author: ShiGuang
* @create: 2021-06-08 19:29
* @description:
*/
public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "test_direct";
// 创建Direct类型的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name, exchangeName, "error");
// 队列2绑定info error warning
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");

String body = "广播消息:今天是2021年6月8日";
// 发送消息
channel.basicPublish(exchangeName, "warning", null, body.getBytes());


// 释放资源
channel.close();
connection.close();
}
}

运行

消费者只需要修改队列名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.atguigu.rabbitmq.routing;

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author: ShiGuang
* @create: 2021-06-08 19:36
* @description:
*/
public class Consumer1 {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}

3、测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息,到达按照需要接收的效果。

其中Consumer1绑定的queue1Namerouting key设置的是errorConsumer2绑定的queue2Namerouting key设置的是infoerrorwarning

因此当生产者设置只有routing keywarning的才可以接收消息时

1
channel.basicPublish(exchangeName, "warning", null, body.getBytes());

Consumer1 接收不到,Consumer2 可以接收到消息。而当生产者设置routing keyerror时,两者都可以接收到消息。

4、总结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key的队列。

五、Topics 通配符模式

1、模式说明

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配一个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert item.*:只能匹配item.insert

举个例子

  • 红色 Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
  • 黄色 Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

2、实现

① 编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.atguigu.rabbitmq.topics;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author: ShiGuang
* @create: 2021-06-08 20:26
* @description:
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "test_topic";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
// 创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

// 绑定队列和交换机
/**
* 参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/

channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
String body = "日志信息:xxxxxxxxxxxxxxxx";
//8. 发送消息goods.info,goods.error
channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}

运行

② 编写消费者

消费者只需要修改队列名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.atguigu.rabbitmq.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author: ShiGuang
* @create: 2021-06-08 20:34
* @description:
*/
public class Consumer1 {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}

3、测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

4、总结

Topic 主题模式可以实现 Publish/Subscribe发布与订阅模式Routing路由模式 的功能;只是 Topic 在配置 routing key 的时候可以使用通配符,显得更加灵活。

六、RabbitMQ 工作模式总结

1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

5、通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列