延迟队列的实现1:
- 将消息发送到指定了过期时间的队列。即在声明某个队列时,声明这个队列中所有消息的过期时间,这样,只要进入此队列的消息,它们的过期时间都是相同的,如以下代码,指定了过期时间为60秒,及过期的消息被路由的目标,如果不设置的话,默认过期时间为30分钟。
Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” ,
“x-message-ttl” , 60000 );
channel .queueDeclare( “daly_queue” , true , false , false , map );
截图:
在队列上,统一设置一个过期时间的缺点是,不能根据业务需求,设置某一个消息的过期时间。
- 或声明一个没有过期时间的队列,但在发布消息时,指定消息的过期时间,如:
正常声明队列:
Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” );
channel .queueDeclare( “daly_queue” , true , false , false , map );
发布时,设置过期时间:
prop = new AMQP .BasicProperties().builder()
.expiration( “3000” ).build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“B过期时间为3秒” .getBytes());
但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:
延迟队列的问题:
1:A消息到达队列Q1并设置A消息的过期时间为10秒。
2:B消息也到达队列Q1,并设置B消息的过期时间为1秒。
3:这种情况下,因为队列中的数据是FIFO,排队执行的,所以,虽然B消息已经过期,但因A消息没有过期无法先将B消息发送给消费者,这就是延迟队列的最大问题。
4:解决方案:使用插件。
问题2示例图-演示
以下演示,用于说明后发的信息虽然已经过期,但因为前一个消息并没有过期,所以,即使后面的消息过期,也不会被消费。
开发生产者
- 声明列信交换机,死信队列及绑定关系。
- 声明正常 交换机 ,正常队列,队列中消息过期时路由目的地,及绑定关系。
- 发布消息时,设置消息的过期时间。
package wj.rabbitmq.daly ;
import com. rabbitmq .client.AMQP ;
import com.rabbitmq.client.Channel ;
import com.rabbitmq.client.Connection ;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
import Java .util.Map ;
/**
* 延迟队列生产者
*/
@Slf4j
public class DalySender {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//声明接收死信的的交换机和队列
channel .exchangeDeclare( “dead_exchange” , “direct” , true );
//声明接收列信交换机数据的队列
channel .queueDeclare( “dead_queue” , true , false ,
false , null );
//声明死信交换机与死信队列的绑定关系
channel .queueBind( “dead_queue” , “dead_exchange” ,
“dead_routing” );
//声明正常接收信息的交换机
channel .exchangeDeclare( “daly_exchange” , “direct” , true );
//声明正常接收数据的队列,并添加列信路由到哪儿去
Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” );
channel .queueDeclare( “daly_queue” , true , false ,
false , map );
//声明正常的绑定关系
channel .queueBind( “daly_queue” , “daly_exchange” , “daly_routing” );
//先发送一个过期时间为30秒
AMQP . BasicProperties prop =
new AMQP .BasicProperties().builder()
.expiration( “30000” )
.build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“A过期时间为30秒” .getBytes());
log .info( “信息A过期30秒,发送完成。” );
//再发送一个过期时间为3秒
prop = new AMQP .BasicProperties().builder()
.expiration( “3000” ).build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“B过期时间为3秒” .getBytes());
log .info( “信息B过期3秒发送完成。” );
con .close();
}
}
开发消费者
开发一个消费者,只需要从死信队列中,读取过期的信息即可。
package wj.rabbitmq.daly ;
import com.rabbitmq.client. *;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
/**
* 只接收死信队列里面的数据即可,不要消费正常的队列
* 以便于让正常的队列里面消息过期
*/
@Slf4j
public class DalyReceiver {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
System . err .println( “准备消费死信队列里面,即过期的信息” );
channel .basicConsume( “dead_queue” ,
true ,
( consumer Tag, message) -> {
log .info( “死信,即过期消息:” + new String(message.getBody()));
}, consumerTag -> {
//ignore
});
}
}
运行
运行生产者,输出以下日志:
14:07:53.138 信息A过期30秒,发送完成。
14:07:53.142 信息B过期3秒发送完成。
运行消费者,输出以下日志,可见,并没有因为B先过期,而先收到B。
14:08:23.140 死信,即过期消息:A过期时间为30秒
14:08:23.140 死信,即过期消息:B过期时间为3秒
延迟队列问题的解决方案(使用插件)
插件下载地址:
rabbitmq所有可用插件列表:
查看已经启用的插件
通过命令rabbitmq-plugins list可以查看rabbitmq的插件列表,及已经启用的插件,前面添加了*号的,为已经启用的插件。可见,启用的插件为management和web,其他插件,都没有启动。但也没有我们要使用的dalyed_message_exchange插件,所以,还需要额外的安装这个插件。
插件相关命令
官方参考地址:
rabbitmq-plugins list 用于列示所有插件
rabbitmq-plugins enable <plugin-name> 启用插件
rabbitmq-plugins disable <plugin-name> 禁用插件
rabbitmq-plugins directories -s 用于查看plugins可安装的目录 (以下示例,要进入容器执行)
安装插件
下载rabbitmq_delayed_message_exchange-<version>.ez文件,进入 docker 容器,将文件放到plugins目录下,此目录为:/opt/bitnami/rabbitmq/plugins。
然后执行安装插件的命令:
如果不是用docker容器运行的,则可以直接将*.ez文件放到rabbitmq的插件目录下为:/usr/lib/rabbitmq/rabbitmq-server-<version>/plugins。
然后就可以执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange。
验证插件
安装插件后,再次查看ui端口的交换机创建处,可以见到新的交换机类型:x-delayed-message .
测试发送延迟消息(Java项目)
开发生产者
- 声明延迟交换机,x-delay-message 。
- 声明普通队列。
- 声明延迟交换机与普通队列的绑定关系。
- 发送延迟消息,通过设置x-delay,单位毫秒。先发送一个延迟长的消息如1分钟。再发送一个延迟少的消息,如6秒。测试必须要先收到延迟少的消息,就算是延迟交换机运行成功。
package wj.rabbitmq.delayexchange ;
import com.rabbitmq.client.AMQP ;
import com.rabbitmq.client.Channel ;
import com.rabbitmq.client.Connection ;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
import java.nio.charset.StandardCharsets ;
import java.util. HashMap ;
import java.util.Map ;
/**
* <pre>
* 使用延迟交换机发送延迟消息
* 发送的消息会被阻塞到延迟交换机中,先到期的消息会先被发送到队列中,并且不再设置过期时间
* 发送的队列的消息的,立刻会被消息
* </pre>
*/
@Slf4j
public class DelayExchangeSender {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//声明延迟交换机,即指定个x-delayed-message的交换机的基础类型
Map < String , Object > map = new HashMap<>();
map .put( “x-delayed-type” , “direct” ); //必须写,固定值
channel .exchangeDeclare(
“my-delayed-exchange” , //指定交换机的名称,任意
“x-delayed-message” , //指定交换机类型,必须是这个类型
true , //是否持久化
false , //是否自动删除
map ); //必须传入这个参数
//声明普通队列
channel .queueDeclare( “my-queue” ,
true , false ,
false , null );
//声明绑定关系
channel .queueBind( “my-queue” ,
“my-delayed-exchange” ,
“delay-routing” );
//先发送延迟60秒的消息,其中x-delay为固定key
AMQP . BasicProperties prop = new AMQP .BasicProperties()
.builder().headers( Map . of ( “x-delay” , 1000 * 60 )) //设置延迟时间为60秒
.build();
channel .basicPublish( “my-delayed-exchange” ,
“delay-routing” ,
prop,
“Delay for 1 minutes” .getBytes( StandardCharsets . UTF_8 ));
log .info( “发送消息:[Delay for 1 minutes] 完成” );
//再发送一个延迟时间少的信息,如6秒
prop = new AMQP .BasicProperties()
.builder().headers( Map . of ( “x-delay” , 1000 * 6 ))
.build();
channel .basicPublish( “my-delayed-exchange” ,
“delay-routing” ,
prop,
“Delay for 6 seconds” .getBytes( StandardCharsets . UTF_8 ));
log .info( “发送消息: [Delay for 6 seconds] 完成” );
}
}
如果先启动生产者,并已经声明了交换机与队列的绑定关系,则消费者可以不再次声明绑定关系。所以,消费者的代码就变的比较简单了,消费者仅是消费消息,并输入消费的时间:
开发消息者
package wj.rabbitmq.delayexchange ;
import com.rabbitmq.client. *;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
/**
* 延迟消息消费者
*/
@Slf4j
public class DelayExchangeReceiver {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//直接消费这个队列即可
channel .basicConsume( “my-queue” , true ,
(consumerTag, message) -> {
log .info( “消费消息:{}” , new String(message.getBody()));
}, consumerTag -> {
//ignore
});
}
}
启动
先启动生产者,发送信息,输出一下时间:
12:35:03.772 发送消息:[Delay for 1 minutes] 完成
12:35:03.774 发送消息: [Delay for 6 seconds] 完成
快速启动消费者,等待,并查看收到消息:
12:35:09.782 消费消息:Delay for 6 seconds
12:36:03.776 消费消息:Delay for 1 minutes
通过上面的代码,可以看出,消费者在6秒后,先收到了先延迟到期的消息,一分钟以后,再收到延迟一分钟的消息。虽然,先发送的是延迟一分钟的消息,但此消息,并没有阻塞后面延迟时间短的消息。
查看延迟交换机
可以看到rate out,此时间将会是一个倒计时,到时间以后再给发送给队列。
在没有收以消息之前,消息会被阻塞到交换机中,所以,没有到延迟时间的消息,不会被发送的队列中。
测试发送延迟消息( spring Boot项目)
使用springboot项目,声明延迟交换机,需要使用CustumExchange自定义交换机类型。
开发配置类
- 定义延迟交换机。
- 定义普通队列。
- 定义绑定关系。
package wj.mq.config.delay;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context. annotation .Bean;
import org.springframework.context.annotation.Configuration;
/**
* 使用rabbitmq_delayed_message_exchange发送延迟消息
*/
@Configuration
public class DelayMessageExchangeConfig {
@Bean
public CustomExchange delayExchange() {
CustomExchange exchange = //
new CustomExchange( “my-delayed-exchange” , // 交换机名称
“x-delayed-message” , // 交换机的类型,必须是此值
true , // durable
false , // autoDelete
Map. of ( “x-delayed-type” , “direct” ) // 传递延迟类型
);
return exchange ;
}
@Bean
public Queue myQueue() {
Queue queue = new Queue( “my-queue” , //队列名称
true , //durable
false , //exclusive
false ); //auto delete
return queue ;
}
@Bean
public Binding myBinding(CustomExchange customExchange , Queue myQueue ) {
Binding binding = BindingBuilder. bind ( myQueue ) //Queue
.to( customExchange ) //exchange
.with( “my-routing” ) //路由routing
.noargs();
return binding ;
}
}
开发生产者代码
- 注入RabbitTemplate
- 发送,并设置过期时间。
- 发送信息,使用ApplicationRunner启动后即发送,也可以开发一个Conntroller,动态调用。
package wj.mq.rabbitmq.delay;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DelaySender implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate ;
public void send() {
// 先发送一个延迟1分钟的消息
rabbitTemplate .convertAndSend( “my-delayed-exchange” , // 交换机名称
“my-routing” , // 路由key
“Message of delay 1 minutes” , // 消息对象
new MessagePostProcessor() { // 接收一个消息发送前的处理函数
@Override
public Message postProcessMessage(Message message ) //
throws AmqpException {
// 处理消息,设置消息的过期时间设置x-delay头,60秒钟
message .getMessageProperties().setDelay(1000 * 60);
return message ;
}
});
log .info( “发送延迟1分钟的消息-完成” );
// 再发送一个延迟6秒钟的消息
rabbitTemplate .convertAndSend( “my-delayed-exchange” , // 交换机名称
“my-routing” , // 路由key
“Message of delay 6 seconds” , // 消息对象
new MessagePostProcessor() { // 接收一个消息发送前的处理函数
@Override
public Message postProcessMessage(Message message ) //
throws AmqpException {
// 处理消息,设置消息的过期时间设置x-delay头,6秒钟
message .getMessageProperties().setDelay(1000 * 6);
return message ;
}
});
log .info( “发送延迟6秒的消息-完成” );
}
@Override
public void run(ApplicationArguments args ) throws Exception {
send(); //启动完成后就发送,也可以通过一个controller调用测试
}
}
开发消息者代码
通过@RabblitListener接收消息
package wj.mq.rabbitmq.delay;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
* 消费延迟消息
*/
@Slf4j
@Component
public class DelayReceiver {
@RabbitListener (queues = “my-queue” , ackMode = “AUTO” )
public void cousumer( @Payload () Message message , Channel channel ) {
log .info( “接收到消息:{}” , new String( message .getBody()));
}
}
运行测试
通过以下输出可以看出,虽然先发送的延迟1分钟的消息,但由于后发出的消息因延迟时间更少(6秒)而被先接收到。说明这个测试已经成功运行。
2022-10-01 21:28:55.408 INFO : 发送延迟1分钟的消息-完成
2022-10-01 21:28:55.417 INFO : 发送延迟6秒的消息-完成
2022-10-01 21:29:01.433 INFO : 接收到消息:Message of delay 6 seconds
2022-10-01 21:29:55.408 INFO : 接收到消息:Message of delay 1 minutes
延迟插件的运行原理
- 延迟交换机,会将收到信息,先缓存到延迟交换机内部。
- 消息延迟到期后,才会发送给目标队列。
延迟插件窗口镜像自定义
由于每一次启动延迟插件,需要将rabbitmq_delayed_message_exchange-<version>.ez copy到bitnami/rabbitmq容器的目录: /opt/bitnami/rabbitmq/plugins/下。且如果容器删除后,必须重新copy。
可以创建一个新的镜像,将插件copy到镜像中,在启动时通过RABBITMQ_PLUGINS指定启动的插件,就可以了。
创建Dockerfile
创建新的镜像
from bitnami/rabbitmq:3.10.8
MAINTAINER WJ
COPY rabbitmq_delayed_message_exchange-3.10.2.ez /opt/bitnami/rabbitmq/plugins/
构建新的镜像
docker build -t rabbitmq:1.0 .
启动新的容器
并指定启用的插件。
#!/bin/bash
docker stop mq
docker rm mq
docker run –name mq -d
-p 5672:5672
-p 15672:15672
-e RABBITMQ_USERNAME=admin
-e RABBITMQ_PASSWORD=admin
-e TZ=Asia/Shanghai
-e RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream,rabbitmq_delayed_message_exchange
-v ${PWD}/data:/bitnami
mq:1.0
进入容器查看
可见,指定的插件,已经启用。
通过界面查看: