一、消息的可靠性投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
消息的可靠性投递就包括一下场景:
保障消息能够成功发出
保障 rabbitmq(broker)能够成功接收。
接收:生产者发送的消息被 broker 接收,由 broker 放到 exchange 中,exchange 再分发给对应的 queue,最后交付给对应的消费者这一完整过程。
发送端要收到 broker 的确认应答,确认 broker 已收到消息
完善的消息补偿机制。发送端没收到 broker 的确认应答,不知道消息是否成功投递成功,这时候就需要做一些补偿处理,比如重新投递。
1、Confirm 确认模式
RabbitMQ 为我们提供了消息的确认机制,用来控制消息投递的可靠性模式。
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产这一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障。
rabbitmq 的 server 又叫做 broker,接收客户端的连接,实现 AMQP 实体服务,包含 exchange、queue 等多种组件。
简单来说,broker 就是 rabbitmq 服务器
① 实现 Confirm 确认模式
第一步:在 channel 上开启确认模式:channel.confirmSelect()
第二步:在 channel 上添加监听:channel.addConfirmListener(ConfirmListener listener),监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或者记录日志等后续处理。
配置 Producer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class Producer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "exchange_confirm"; String routingKey = "confirm"; String body = "发送测试消息";
channel.confirmSelect();
channel.basicPublish(exchangeName, routingKey, null, body.getBytes()); channel.addConfirmListener(new ConfirmListener() {
@Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息投递成功"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息投递失败"); } }); channel.close(); connection.close(); } }
|
② 基于 spring 的确认模式
基于 spring 集成 rabbitmq,需要在 rabbitmq 的 xml 配置文件中开启确认模式
[1]rabbitmq 整个消息投递的路径
消息从 producer –> exchange
则会返回一个 confirmCallback
消息从 exchange –> queue
投递失败则会返回一个 returnCallback
我们将利用这两个 callback 控制消息的可靠性投递
[2]具体实现
基于 Spring 集成 Rabbitmq 的环境下,配置 ConfirmCallback 回调函数
创建项目 rabbitmq-producer-spring
修改 pom 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
|
在 resource 文件夹下面添加 配置文件 rabbitmq.properties
1 2 3 4 5
| rabbitmq.host=localhost rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest rabbitmq.virtual-host=/
|
在 resource 文件夹下面添加 配置文件 spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" /> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue id="queue_confirm" name="queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="exchange_confirm"> <rabbit:bindings> <rabbit:binding queue="queue_confirm" key="confirm"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
</beans>
|
创建测试类 ,添加确认模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testConfirm() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("rabbitmq broker接收消息成功" + cause); } else { System.out.println("rabbitmq broker接收消息失败" + cause); } } }); rabbitTemplate.convertAndSend("exchange_confirm", "confirm", "消息发送内容"); } }
|
运行程序
修改发送到路由器名称为 exchange_confirm_1,失败并提示原因
2、Return 回退模式
return 机制的前提:当前的队列必须要有消费者存在,所以测试的时候,要先启动消费者,再启动生产者
如果直接启动生产者,则即刻就会被 return 监听视为不可达信息。
如果将 Mandatory 属性设置为 false,对于不可达的消息会被 Broker 直接删除,那么生产端就不会进行任何操作,也就是 ReturnListener 的监听回调会无效。所以,Mandatory 必须设置为 true。
return 机制可应用于,确认消费者已经成功收到消息,如果未收到,可进行消息的重新发送等操作
Return Listener 用于处理一些不可路由的消息。
我们的消息生产者,通过指定一个Exchange
和Routingkey
,把消息送到某一个队列中,然后我们的消费者监听队列,进行消息处理操作。但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候我们需要监听这种不可达的消息,就要使用return listener
在基础 API 中有一个关键的配置项:Mandatory
- 如果为 true,则监听会接收到路由不可达的消息,然后进行后续处理,
- 如果为 false,那么 broker 端自动删除该消息。(默认 false)
① 实现 Return 回退模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class Producer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "exchange_confirm"; String routingKey = "confirm"; String errorRoutingKey = "confirm1"; String body = "发送测试消息";
channel.basicPublish(exchangeName, routingKey, true,null, body.getBytes()); channel.basicPublish(exchangeName, errorRoutingKey, true,null, body.getBytes());
channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("监听到不可达消息..."); } }); } }
|
② 基于 spring 的回退模式
基于 spring 集成 rabbitmq,需要在 rabbitmq 的 xml 配置文件中开启回退模式
spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13
|
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" />
|
在ProducerTest.java
下新增测试方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
@Test public void testReturn() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了....");
System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey);
} });
rabbitTemplate.convertAndSend("exchange_confirm", "confirm", "message confirm...."); }
|
3、消息的可靠投递小结
- 设置
ConnectionFactory
的publisher-confirms="true"
开启 确认模式。 - 使用
rabbitTemplate.setConfirmCallback
设置回调函数。当消息发送到 exchange
后回调 confirm
方法。在方法中判断 ack
,如果为true
,则发送成功,如果为false
,则发送失败,需要处理。 - 设置
ConnectionFactory
的 publisher-returns="true"
开启 退回模式。 - 使用
rabbitTemplate.setReturnCallback
设置退回函数,当消息从exchange
路由到 queue
失败后,如果设置了 rabbitTemplate.setMandatory(true)
参数,则会将消息退回给 producer
并执行回调函数returnedMessage
4、Consumer Ack
ack 指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有二种确认方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
其中自动确认是指,当消息一旦被 Consumer 接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck(),手动签收,如果出现异常,则调用 channel.basicNack()方法,让其自动重新发送消息。
① 实现
创建项目 rabbitmq-consumer-spring
修改 pom 文件,和上面生产者的一样
在 resource
文件夹下面新建 rabbitmq.properties
文件 和 spring-rabbitmq-consumer.xml
文件
rabbitmq.properties
文件和上面生产者的一样
spring-rabbitmq-consumer.xml
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.atguigu.listener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="ackListener" queue-names="queue_confirm"></rabbit:listener> </rabbit:listener-container>
</beans>
|
② 自动确认
添加监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.atguigu.listener;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component;
@Component public class AckListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }
|
③ 手动确认
添加监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Component public class AckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String(message.getBody())); System.out.println("处理业务逻辑"); int i = 3/0;
channel.basicAck(deliveryTag,true); } catch (Exception e) { e.printStackTrace();
channel.basicNack(deliveryTag,true,true); } } }
|
④ 添加测试类
1 2 3 4 5 6 7 8 9 10
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test(){ while (true){
} } }
|
如果在消费端没有出现异常,则调用 channel.basicAck(deliveryTag,true);方法确认签收消息
如果出现异常,则在 catch 中调用 basicNack,拒绝消息,让 MQ 重新发送消息。
二、消费端限流
1、概念
假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。
当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
2、实现
①QosListener
在项目 rabbitmq-consumer-spring
,新建 QosListener.java
Consumer 限流机制需要确保 ack 机制为手动确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.atguigu.listener;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;
@Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println(new String(message.getBody())); } }
|
② 修改配置文件
修改spring-rabbitmq-consumer.xml
配置文件
listener-container 配置属性:perfetch = 1,表示消费端每次从 mq 拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息
1 2 3 4 5
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="qosListener" queue-names="queue_confirm"></rabbit:listener> </rabbit:listener-container>
|
③ 测试
在项目 spring-rabbitmq-producers
, ProducerTest
新增测试方法并运行
1 2 3 4 5 6 7
| @Test public void testSend() {
for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("exchange_confirm","confirm","message confirm"); } }
|
最后运行消费者测试类 ConsumerTest
查看后台 ,有 9 条消息待消费 ,有 1 条消息未确认
修改 QosListener
, 添加手动签收方法 ,这样就可以确认消费限流
1 2 3 4 5 6 7 8 9 10
| @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println(new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } }
|
三、TTL
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
1、控制后台实现
① 增加队列
“x-message-ttl” 参数表示过期时间,单位毫秒
② 增加交换机并绑定队列
② 发送消息
在 Queues 可以查看消息,但五秒之后,消息自动过期消失
2、基于代码实现
① 队列统一过期
新建项目rabbitmq-ttl
pom.xml 配置文件和 rabbitmq.properties 文件同上
新建配置文件 spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl"> <rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl"> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
</beans>
|
新建测试类 ProducerTest,添加测试方法,发送消息 ,十秒之后自动过期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hello","message ttl"); } } }
|
② 消息过期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Test public void testTtl() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } };
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hello", "message ttl",messagePostProcessor); }
|
如果同时设置了消息的过期时间和队列的过期时间,以时间短的为准。
四、死信队列
1、什么是死信队列
由于某些特殊原因,当一条消息初次消费失败,消息队列 RocketMQ 版会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,消息队列 RocketMQ 版不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 版中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信消息:Dead-Letter Message
死信队列:Dead-Letter Queue
死信交换机:DeadLetter Exchange
2、消息成为死信的三种情况
- 队列消息长度到达限制;比如给队列最大只能存储 10 条消息,当第 11 条消息进来的时候存不下了,第 11 条消息就被称为死信
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
3、死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
- 丢弃,如果不是很重要,可以选择丢弃
- 记录死信入库,然后做后续的业务分析或处理
- 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,
4、队列绑定死信交换机
给队列设置参数
x-dead-letter-exchange
:死信交换机名称
x-dead-letter-routing-key
:发送给死信交换机的 routingkey
① 过期时间代码实现
修改rabbitmq-ttl
项目的配置文件 spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="normal_queue_dlx" id="normal_queue_dlx"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx"/> <entry key="x-dead-letter-routing-key" value="dlx.hello"></entry> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <entry key="x-max-length" value="10" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:topic-exchange name="normal_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="normal.dlx.#" queue="normal_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
</beans>
|
在测试类 ProducerTest,添加方法进行测试
1 2 3 4 5
| @Test public void testDlx(){ rabbitTemplate.convertAndSend("normal_exchange_dlx","normal.dlx.hello","我是一条正常消息"); }
|
② 长度限制代码实现
1 2 3 4 5 6 7
| @Test public void testDlxByLength() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("normal_exchange_dlx", "normal.dlx.hello", "我是一条正常消息"); } }
|
③ 消息拒收
在消费者工程创建 DlxListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Component public class DlxListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑..."); int i = 3 / 0; channel.basicAck(deliveryTag, true); } catch (Exception e) { System.out.println("出现异常,拒绝接受"); channel.basicNack(deliveryTag, true, false); } } }
|
修改消费者配置文件 spring-rabbitmq-consumer.xml
1 2 3 4 5
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="dlxListener" queue-names="normal_queue_dlx"></rabbit:listener> </rabbit:listener-container>
|
运行消费者测试类 ConsumerTest,在使用生产者发送一条消息进行测试
④ 小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
五、延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景一:在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。
使用 TTL+死信队列 组合实现延迟队列的效果。
1、生产者
创建新项目mabbitmq-delay-producer
创建生产者配置文件 spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="order_queue" id="order_queue"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/> <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:topic-exchange name="order_exchange"> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="order_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
</beans>
|
routing-key:生产者的order.msg
符合正常订单队列的order.#
消息过期后 routing-key 变为dlx.order.cancel
到了死信交换机(order_exchange_dlx)
dlx.order.cancel
符合死信队列的dlx.order.#
,消息被发送到死信队列
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.atguigu;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testDelay() throws InterruptedException { rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2021年6月10日16:41:47"); for (int i = 10; i > 0; i--) { System.out.println(i + "..."); Thread.sleep(1000); } } }
|
2、消费者
修改消费者项目,创建 OrderListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.atguigu.listener;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;
@Component public class OrderListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); channel.basicAck(deliveryTag, true); } catch (Exception e) { System.out.println("出现异常,拒绝接受"); channel.basicNack(deliveryTag, true, false); } } }
|
修改消费者配置文件 spring-rabbitmq-consumer.xml
,监听死信队列
1 2 3 4 5
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener> </rabbit:listener-container>
|
运行消费者测试类ConsumerTest
查看效果
3、过程
生产者发送订单消息后到了正常的订单队列,等待 10s,正常的订单队列里面的订单消息没有被消费者消费而过期
过期的订单消息被发送到死信交换机,接着被死信交换机发送到了死信队列
监听着死信队列的消费者就收到了消息