RabbitMQ 简介
RabbitMQ 官网
RabbitMQ 是由Erlang
语言开发,基于 AMQP 协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006 年,AMQP 规范发布。类比 HTTP。
RabbitMQ 提供了 6 种模式:
- 简单模式
- work 模式
- Publish/Subscribe 发布与订阅模式
- Routing 路由模式
- Topics 主题模式
- RPC 远程调用模式(远程调用,不太算 MQ)
官网对应模式介绍:官方文档
RabbitMQ 基础架构
RabbitMQ 相关概念
- Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
AMQP
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP 是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。
RabbitMQ 是 AMQP 协议的 Erlang 的实现。
概念 | 说明 |
---|
连接 Connection | 一个网络连接,比如 TCP/IP 套接字连接。 |
会话 Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 |
信道 Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端 Client | AMQP 连接或者会话的发起者。AMQP 是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点 Broker | 消息中间件的服务节点;一般情况下可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。 |
端点 | AMQP 对话的任意一方。一个 AMQP 连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者 Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者 Producer | 一个向交换机发布消息的客户端应用程序。 |
一、Simple 简单模式
1、创建工程
创建项目:rabbitmq-producer
创建项目:rabbitmq-consumer
2、添加依赖
往两个项目的 pom.xml 文件中添加如下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
|
3、编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package com.atguigu.rabbitmq.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
String message = "你好;RabbitMQ!";
channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message);
channel.close(); connection.close(); } }
|
运行 main 程序,浏览器访问 http://127.0.0.1:15672
登录 rabbitMQ 的管理控制台,可以发现队列
点击 simple_queue 进入后,可以获取发送的消息
4、编写消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package com.atguigu.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:" + consumerTag); System.out.println("Exchange:" + envelope.getExchange()); System.out.println("RoutingKey:" + envelope.getRoutingKey()); System.out.println("properties:" + properties); System.out.println("body:" + new String(body)); } };
channel.basicConsume("simple_queue", true, consumer); } }
|
运行程序,之前生产者发送的消息全部接收到了!
5、总结
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
6、运转流程
① 生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker;
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至 RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
② 消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker
- 向 Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待 Broker 回应闭关投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ 从队列中删除相应已经被确认的消息;
- 关闭信道;
- 关闭连接;
二、Work queues 工作队列模式
1、模式说明
Work Queues
与简单模式
相比,多了一个或多个消费端,多个消费端共同消费同一个队列中的消息。
2、实现
复制多个简单模式中的消费者进行同时消费消息的测试
先启动两个消费者,然后再启动生产者发送消息;到 IDEA 的两个消费者对应的控制台查看是否竞争性的接收到消息
生产者这里可以修改一下实现批量发送消息,取代之前的单一发送
1 2 3 4 5
| for (int i = 1; i <= 10; i++) { String body = "这是发送出去的第" + i + "条消息"; channel.basicPublish("", "simple_queue", null, body.getBytes()); }
|
3、总结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
三、Publish/Subscribe 发布与订阅模式
1、模式说明
前面 2 个模式中,只有 3 个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个 exchange 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。
Exchange 有常见以下 3 种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定 routing key 的队列
- Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
2、实现
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
① 编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package com.atguigu.rabbitmq.publish;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, ""); channel.queueBind(queue2Name, exchangeName, "");
String body = "广播消息:今天是2021年6月8日"; channel.basicPublish(exchangeName, "", null, body.getBytes());
channel.close(); connection.close(); } }
|
运行生产者查看
交换机页面:
② 编写多个消费者
消费者 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.atguigu.rabbitmq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1";
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } }; channel.basicConsume(queue1Name, true, consumer); } }
|
消费者 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.atguigu.rabbitmq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
} };
channel.basicConsume(queue2Name, true, consumer); } }
|
3、测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到 RabbitMQ 的管理后台找到Exchanges
选项卡,点击 fanout_exchange
的交换机,可以查看到如下的绑定:
4、总结
交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的**生产方是面向队列发送消息(底层使用默认交换机)**。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。
四、Routing 路由模式
1、模式说明
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由 key) - 消息的发送方在向 Exchange 发送消息时,也必须指定消息的
RoutingKey
。 - Exchange 不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
图解:
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
2、实现
生产者在代码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct
,还有队列绑定交换机的时候需要指定routing key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.atguigu.rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "test_direct"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "error"); channel.queueBind(queue2Name, exchangeName, "info"); channel.queueBind(queue2Name, exchangeName, "error"); channel.queueBind(queue2Name, exchangeName, "warning");
String body = "广播消息:今天是2021年6月8日"; channel.basicPublish(exchangeName, "warning", null, body.getBytes());
channel.close(); connection.close(); } }
|
运行
消费者只需要修改队列名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.atguigu.rabbitmq.routing;
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1";
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } }; channel.basicConsume(queue1Name, true, consumer); } }
|
3、测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key
对应队列的消息,到达按照需要接收的效果。
其中Consumer1
绑定的queue1Name
的routing key
设置的是error
,Consumer2
绑定的queue2Name
的routing key
设置的是info
,error
和warning
因此当生产者设置只有routing key
为warning
的才可以接收消息时
1
| channel.basicPublish(exchangeName, "warning", null, body.getBytes());
|
Consumer1 接收不到,Consumer2 可以接收到消息。而当生产者设置routing key
为error
时,两者都可以接收到消息。
4、总结
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key的队列。
五、Topics 通配符模式
1、模式说明
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配一个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert item.*
:只能匹配item.insert
举个例子
- 红色 Queue:绑定的是
usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到 - 黄色 Queue:绑定的是
#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配
2、实现
① 编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.atguigu.rabbitmq.topics;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "#.error"); channel.queueBind(queue1Name, exchangeName, "order.*"); channel.queueBind(queue2Name, exchangeName, "*.*"); String body = "日志信息:xxxxxxxxxxxxxxxx"; channel.basicPublish(exchangeName, "order.info", null, body.getBytes()); channel.close(); connection.close(); } }
|
运行
② 编写消费者
消费者只需要修改队列名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.atguigu.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1";
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } }; channel.basicConsume(queue1Name, true, consumer); } }
|
3、测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key
对应队列的消息;到达按照需要接收的效果;并且这些routing key
可以使用通配符。
4、总结
Topic 主题模式可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是 Topic 在配置 routing key 的时候可以使用通配符,显得更加灵活。
六、RabbitMQ 工作模式总结
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列