一、消息的可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。

消息的可靠性投递就包括一下场景:

  1. 保障消息能够成功发出

  2. 保障 rabbitmq(broker)能够成功接收。

    接收:生产者发送的消息被 broker 接收,由 broker 放到 exchange 中,exchange 再分发给对应的 queue,最后交付给对应的消费者这一完整过程。

  3. 发送端要收到 broker 的确认应答,确认 broker 已收到消息

  4. 完善的消息补偿机制。发送端没收到 broker 的确认应答,不知道消息是否成功投递成功,这时候就需要做一些补偿处理,比如重新投递。

1、Confirm 确认模式

RabbitMQ 为我们提供了消息的确认机制,用来控制消息投递的可靠性模式。

消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产这一个应答。

生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障。

rabbitmq 的 server 又叫做 broker,接收客户端的连接,实现 AMQP 实体服务,包含 exchange、queue 等多种组件。

简单来说,broker 就是 rabbitmq 服务器

① 实现 Confirm 确认模式

第一步:在 channel 上开启确认模式:channel.confirmSelect()

第二步:在 channel 上添加监听:channel.addConfirmListener(ConfirmListener listener),监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或者记录日志等后续处理。

配置 Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "exchange_confirm";
String routingKey = "confirm";
String body = "发送测试消息";

// 指定消息的投递模式:confirm 确认模式
channel.confirmSelect();

channel.basicPublish(exchangeName, routingKey, null, body.getBytes());
// 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
/**
* handleAck:成功监听
* 返回成功的回调函数
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息投递成功");
}
/**
* handleNack:失败监听
* 返回失败的回调函数
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息投递失败");
}
});
// 释放资源
channel.close();
connection.close();
}
}

② 基于 spring 的确认模式

基于 spring 集成 rabbitmq,需要在 rabbitmq 的 xml 配置文件中开启确认模式

[1]rabbitmq 整个消息投递的路径

消息从 producer –> exchange 则会返回一个 confirmCallback

消息从 exchange –> queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递

[2]具体实现

基于 Spring 集成 Rabbitmq 的环境下,配置 ConfirmCallback 回调函数

创建项目 rabbitmq-producer-spring

修改 pom 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

在 resource 文件夹下面添加 配置文件 rabbitmq.properties

1
2
3
4
5
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

在 resource 文件夹下面添加 配置文件 spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory
确认模式开启:publisher-confirms="true" -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>


<!--消息可靠性投递(生产端)-->
<rabbit:queue id="queue_confirm" name="queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

</beans>

创建测试类 ,添加确认模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("rabbitmq broker接收消息成功" + cause);
} else {
System.out.println("rabbitmq broker接收消息失败" + cause);
}
}
});
rabbitTemplate.convertAndSend("exchange_confirm", "confirm", "消息发送内容");
}
}

运行程序

修改发送到路由器名称为 exchange_confirm_1,失败并提示原因

2、Return 回退模式

return 机制的前提:当前的队列必须要有消费者存在,所以测试的时候,要先启动消费者,再启动生产者

如果直接启动生产者,则即刻就会被 return 监听视为不可达信息。

如果将 Mandatory 属性设置为 false,对于不可达的消息会被 Broker 直接删除,那么生产端就不会进行任何操作,也就是 ReturnListener 的监听回调会无效。所以,Mandatory 必须设置为 true。

return 机制可应用于,确认消费者已经成功收到消息,如果未收到,可进行消息的重新发送等操作

Return Listener 用于处理一些不可路由的消息。

我们的消息生产者,通过指定一个ExchangeRoutingkey,把消息送到某一个队列中,然后我们的消费者监听队列,进行消息处理操作。但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候我们需要监听这种不可达的消息,就要使用return listener

在基础 API 中有一个关键的配置项:Mandatory

  • 如果为 true,则监听会接收到路由不可达的消息,然后进行后续处理,
  • 如果为 false,那么 broker 端自动删除该消息。(默认 false)

① 实现 Return 回退模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "exchange_confirm";
String routingKey = "confirm";
String errorRoutingKey = "confirm1";
String body = "发送测试消息";

// void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.basicPublish(exchangeName, routingKey, true,null, body.getBytes());
channel.basicPublish(exchangeName, errorRoutingKey, true,null, body.getBytes());
/**
* param1:replyCode 队列响应给浏览器的状态码
* param2:replyText 状态码对应的文本信息
* param3:exchange 交换机的名称
* param4:routingKey 路由的key
* param5:properties 消息的相关属性
* param6:body 消息体的内容
*/
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("监听到不可达消息...");
}
});
}
}

② 基于 spring 的回退模式

基于 spring 集成 rabbitmq,需要在 rabbitmq 的 xml 配置文件中开启回退模式

spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 定义rabbitmq connectionFactory
确认模式开启:publisher-confirms="true"
回退模式开始:publisher-returns="true"
-->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>

ProducerTest.java下新增测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时才会执行 ReturnCallBack
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/

@Test
public void testReturn() {

// 交换机设置强制处理失败消息的模式
rabbitTemplate.setMandatory(true);


// 设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");

System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);

//处理
}
});

// 发送消息
rabbitTemplate.convertAndSend("exchange_confirm", "confirm", "message confirm....");
}

3、消息的可靠投递小结

  • 设置 ConnectionFactorypublisher-confirms="true" 开启 确认模式。
  • 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
  • 设置 ConnectionFactorypublisher-returns="true" 开启 退回模式。
  • 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage

4、Consumer Ack

ack 指 Acknowledge,确认。 表示消费端收到消息后的确认方式。

有二种确认方式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”

其中自动确认是指,当消息一旦被 Consumer 接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck(),手动签收,如果出现异常,则调用 channel.basicNack()方法,让其自动重新发送消息。

① 实现

创建项目 rabbitmq-consumer-spring

修改 pom 文件,和上面生产者的一样

resource 文件夹下面新建 rabbitmq.properties 文件 和 spring-rabbitmq-consumer.xml 文件

rabbitmq.properties 文件和上面生产者的一样

spring-rabbitmq-consumer.xml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>


<context:component-scan base-package="com.atguigu.listener"/>

<!--定义监听器容器:acknowledge="manual":手动签收 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="queue_confirm"></rabbit:listener>
</rabbit:listener-container>

</beans>

② 自动确认

添加监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.atguigu.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
* @author: ShiGuang
* @create: 2021-06-10 10:31
* @description:
*/
@Component
public class AckListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}

③ 手动确认

添加监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
// 获取消息传递标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 接收消息
System.out.println(new String(message.getBody()));
// 处理业务逻辑
System.out.println("处理业务逻辑");
// 出现错误
int i = 3/0;
// ③ 手动签收
/**
* 第一个参数:表示收到的标签
* 第二个参数:如果为true表示可以签收所有的消息
*/
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
e.printStackTrace();
// 拒绝签收
/*
第三个参数:requeue:重回队列。
设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
}
}
}

④ 添加测试类

1
2
3
4
5
6
7
8
9
10
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while (true){

}
}
}

如果在消费端没有出现异常,则调用 channel.basicAck(deliveryTag,true);方法确认签收消息

如果出现异常,则在 catch 中调用 basicNack,拒绝消息,让 MQ 重新发送消息。

二、消费端限流

1、概念

假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2、实现

①QosListener

在项目 rabbitmq-consumer-spring ,新建 QosListener.java

Consumer 限流机制需要确保 ack 机制为手动确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.atguigu.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 获取消息
System.out.println(new String(message.getBody()));
}
}

② 修改配置文件

修改spring-rabbitmq-consumer.xml 配置文件

listener-container 配置属性:perfetch = 1,表示消费端每次从 mq 拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息

1
2
3
4
5
<!--定义监听器容器:acknowledge="manual":手动签收 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
<rabbit:listener ref="qosListener" queue-names="queue_confirm"></rabbit:listener>
</rabbit:listener-container>

③ 测试

在项目 spring-rabbitmq-producers , ProducerTest新增测试方法并运行

1
2
3
4
5
6
7
@Test
public void testSend() {

for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("exchange_confirm","confirm","message confirm");
}
}

最后运行消费者测试类 ConsumerTest

查看后台 ,有 9 条消息待消费 ,有 1 条消息未确认

修改 QosListener , 添加手动签收方法 ,这样就可以确认消费限流

1
2
3
4
5
6
7
8
9
10
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 获取消息
System.out.println(new String(message.getBody()));
// 签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}

三、TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

1、控制后台实现

① 增加队列

“x-message-ttl” 参数表示过期时间,单位毫秒

② 增加交换机并绑定队列

② 发送消息

在 Queues 可以查看消息,但五秒之后,消息自动过期消失

2、基于代码实现

① 队列统一过期

新建项目rabbitmq-ttl

pom.xml 配置文件和 rabbitmq.properties 文件同上

新建配置文件 spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>


<!--TTL 队列-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--
设置x-message-ttl队列的过期时间
默认情况下value-type的类型是String类型,但时间的类型是number类型,所以需要设置成integer类型
-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>

<!--设置交换机-->
<rabbit:topic-exchange name="test_exchange_ttl">
<!--交换机绑定队列-->
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

</beans>

新建测试类 ProducerTest,添加测试方法,发送消息 ,十秒之后自动过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hello","message ttl");
}
}
}

② 消息过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testTtl() {

// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置message的信息
// 消息的过期时间 ,5秒之后过期
message.getMessageProperties().setExpiration("5000");
// 返回该消息
return message;
}
};

//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hello", "message ttl",messagePostProcessor);
}

如果同时设置了消息的过期时间和队列的过期时间,以时间短的为准。

四、死信队列

1、什么是死信队列

由于某些特殊原因,当一条消息初次消费失败,消息队列 RocketMQ 版会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,消息队列 RocketMQ 版不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 版中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信消息:Dead-Letter Message

死信队列:Dead-Letter Queue

死信交换机:DeadLetter Exchange

2、消息成为死信的三种情况

  1. 队列消息长度到达限制;比如给队列最大只能存储 10 条消息,当第 11 条消息进来的时候存不下了,第 11 条消息就被称为死信
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

3、死信的处理方式

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

  • 丢弃,如果不是很重要,可以选择丢弃
  • 记录死信入库,然后做后续的业务分析或处理
  • 通过死信队列,由负责监听死信的应用程序进行处理

综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,

4、队列绑定死信交换机

给队列设置参数

x-dead-letter-exchange :死信交换机名称

x-dead-letter-routing-key:发送给死信交换机的 routingkey

① 过期时间代码实现

修改rabbitmq-ttl项目的配置文件 spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>


<!-- 声明正常的队列(normal_queue_dlx) -->
<rabbit:queue name="normal_queue_dlx" id="normal_queue_dlx">
<!-- 正常队列设置参数绑定死信交换机 -->
<rabbit:queue-arguments>
<!-- x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx"/>
<!-- x-dead-letter-routing-key:发送给死信交换机的routingkey -->
<entry key="x-dead-letter-routing-key" value="dlx.hello"></entry>
<!-- 设置队列的过期时间 -->
<!-- 时间到了没还有被消费就发送到绑定的死信交换机(exchange_dlx)上去 -->
<!-- 再被死信交换机发送到死信队列(queue_dlx) -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
<!-- 设置队列的长度限制 -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!-- 正常交换机(normal_exchange_dlx)和正常队列(normal_queue_dlx)绑定 -->
<rabbit:topic-exchange name="normal_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="normal.dlx.#" queue="normal_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 死信队列(queue_dlx)和死信交换机(exchange_dlx)绑定 -->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

</beans>

在测试类 ProducerTest,添加方法进行测试

1
2
3
4
5
@Test
public void testDlx(){
// 测试过期时间,死信消息
rabbitTemplate.convertAndSend("normal_exchange_dlx","normal.dlx.hello","我是一条正常消息");
}

② 长度限制代码实现

1
2
3
4
5
6
7
@Test
public void testDlxByLength() {
// 测试长度限制后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("normal_exchange_dlx", "normal.dlx.hello", "我是一条正常消息");
}
}

③ 消息拒收

在消费者工程创建 DlxListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class DlxListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
// 接收转换消息
System.out.println(new String(message.getBody()));

// 处理业务逻辑
System.out.println("处理业务逻辑...");
// 出现错误
int i = 3 / 0;
// 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
// 拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag, true, false);
}
}
}

修改消费者配置文件 spring-rabbitmq-consumer.xml

1
2
3
4
5
<!--定义监听器容器:acknowledge="manual":手动签收 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--定义监听器,监听正常队列-->
<rabbit:listener ref="dlxListener" queue-names="normal_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

运行消费者测试类 ConsumerTest,在使用生产者发送一条消息进行测试

④ 小结

  1. 死信交换机和死信队列和普通的没有区别
  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
  3. 消息成为死信的三种情况:
    • 队列消息长度到达限制;
    • 消费者拒接消费消息,并且不重回队列;
    • 原队列存在消息过期设置,消息到达超时时间未被消费;

五、延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。

使用 TTL+死信队列 组合实现延迟队列的效果。

1、生产者

创建新项目mabbitmq-delay-producer

创建生产者配置文件 spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>

<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<!-- 声明正常的队列 -->
<rabbit:queue name="order_queue" id="order_queue">
<!-- 正常队列设置参数绑定死信交换机 -->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!-- 正常交换机和正常队列绑定 -->
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 死信队列和死信交换机绑定 -->
<rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

</beans>

routing-key:生产者的order.msg符合正常订单队列的order.#

消息过期后 routing-key 变为dlx.order.cancel到了死信交换机(order_exchange_dlx)

dlx.order.cancel符合死信队列的dlx.order.#,消息被发送到死信队列

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.atguigu;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testDelay() throws InterruptedException {
// 发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2021年6月10日16:41:47");
// 打印倒计时10秒
for (int i = 10; i > 0; i--) {
System.out.println(i + "...");
Thread.sleep(1000);
}
}
}

2、消费者

修改消费者项目,创建 OrderListener.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.atguigu.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class OrderListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
// 接收转换消息
System.out.println(new String(message.getBody()));

// 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
// 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
// 拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag, true, false);
}
}
}

修改消费者配置文件 spring-rabbitmq-consumer.xml,监听死信队列

1
2
3
4
5
<!--定义监听器容器:acknowledge="manual":手动签收 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--延迟队列效果实现:需要监听死信队列 -->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

运行消费者测试类ConsumerTest查看效果

3、过程

生产者发送订单消息后到了正常的订单队列,等待 10s,正常的订单队列里面的订单消息没有被消费者消费而过期

过期的订单消息被发送到死信交换机,接着被死信交换机发送到了死信队列

监听着死信队列的消费者就收到了消息