您的位置 首页 java

延迟队列-及存在的问题和解决方案

延迟队列的实现1:

  1. 将消息发送到指定了过期时间的队列。即在声明某个队列时,声明这个队列中所有消息的过期时间,这样,只要进入此队列的消息,它们的过期时间都是相同的,如以下代码,指定了过期时间为60秒,及过期的消息被路由的目标,如果不设置的话,默认过期时间为30分钟。

Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” ,
“x-message-ttl” , 60000 );
channel .queueDeclare( “daly_queue” , true , false , false , map );

截图:

延迟队列-及存在的问题和解决方案

在队列上,统一设置一个过期时间的缺点是,不能根据业务需求,设置某一个消息的过期时间。

  1. 或声明一个没有过期时间的队列,但在发布消息时,指定消息的过期时间,如:

正常声明队列:

Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” );
channel .queueDeclare( “daly_queue” , true , false , false , map );

发布时,设置过期时间:

prop = new AMQP .BasicProperties().builder()
.expiration(
“3000” ).build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“B过期时间为3秒” .getBytes());

但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:

延迟队列的问题:

1:A消息到达队列Q1并设置A消息的过期时间为10秒。

2:B消息也到达队列Q1,并设置B消息的过期时间为1秒。

3:这种情况下,因为队列中的数据是FIFO,排队执行的,所以,虽然B消息已经过期,但因A消息没有过期无法先将B消息发送给消费者,这就是延迟队列的最大问题。

4:解决方案:使用插件。

问题2示例图-演示

以下演示,用于说明后发的信息虽然已经过期,但因为前一个消息并没有过期,所以,即使后面的消息过期,也不会被消费。

延迟队列-及存在的问题和解决方案

开发生产者

  • 声明列信交换机,死信队列及绑定关系。
  • 声明正常 交换机 ,正常队列,队列中消息过期时路由目的地,及绑定关系。
  • 发布消息时,设置消息的过期时间。

package wj.rabbitmq.daly ;
import com. rabbitmq .client.AMQP ;
import com.rabbitmq.client.Channel ;
import com.rabbitmq.client.Connection ;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
import Java .util.Map ;
/**
* 延迟队列生产者
*/
@Slf4j
public class DalySender {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//声明接收死信的的交换机和队列
channel .exchangeDeclare( “dead_exchange” , “direct” , true );
//声明接收列信交换机数据的队列
channel .queueDeclare( “dead_queue” , true , false ,
false , null );
//声明死信交换机与死信队列的绑定关系
channel .queueBind( “dead_queue” , “dead_exchange” ,
“dead_routing” );
//声明正常接收信息的交换机
channel .exchangeDeclare( “daly_exchange” , “direct” , true );
//声明正常接收数据的队列,并添加列信路由到哪儿去
Map < String , Object > map =
Map . of ( “x-dead-letter-exchange” , “dead_exchange” ,
“x-dead-letter-routing-key” , “dead_routing” );
channel .queueDeclare( “daly_queue” , true , false ,
false , map );
//声明正常的绑定关系
channel .queueBind( “daly_queue” , “daly_exchange” , “daly_routing” );
//先发送一个过期时间为30秒
AMQP . BasicProperties prop =
new AMQP .BasicProperties().builder()
.expiration(
“30000” )
.build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“A过期时间为30秒” .getBytes());
log .info( “信息A过期30秒,发送完成。” );
//再发送一个过期时间为3秒
prop = new AMQP .BasicProperties().builder()
.expiration(
“3000” ).build();
channel .basicPublish( “daly_exchange” ,
“daly_routing” ,
prop ,
“B过期时间为3秒” .getBytes());
log .info( “信息B过期3秒发送完成。” );
con .close();
}
}

开发消费者

开发一个消费者,只需要从死信队列中,读取过期的信息即可。

package wj.rabbitmq.daly ;
import com.rabbitmq.client. *;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
/**
* 只接收死信队列里面的数据即可,不要消费正常的队列
* 以便于让正常的队列里面消息过期
*/
@Slf4j
public class DalyReceiver {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
System . err .println( “准备消费死信队列里面,即过期的信息” );
channel .basicConsume( “dead_queue” ,
true ,
( consumer Tag, message) -> {
log .info( “死信,即过期消息:” + new String(message.getBody()));
}, consumerTag -> {
//ignore
});
}
}

运行

运行生产者,输出以下日志:

14:07:53.138 信息A过期30秒,发送完成。

14:07:53.142 信息B过期3秒发送完成。

运行消费者,输出以下日志,可见,并没有因为B先过期,而先收到B。

14:08:23.140 死信,即过期消息:A过期时间为30秒

14:08:23.140 死信,即过期消息:B过期时间为3秒

延迟队列问题的解决方案(使用插件)

插件下载地址:

rabbitmq所有可用插件列表:

查看已经启用的插件

通过命令rabbitmq-plugins list可以查看rabbitmq的插件列表,及已经启用的插件,前面添加了*号的,为已经启用的插件。可见,启用的插件为management和web,其他插件,都没有启动。但也没有我们要使用的dalyed_message_exchange插件,所以,还需要额外的安装这个插件。

延迟队列-及存在的问题和解决方案

插件相关命令

官方参考地址:

rabbitmq-plugins list 用于列示所有插件

rabbitmq-plugins enable <plugin-name> 启用插件

rabbitmq-plugins disable <plugin-name> 禁用插件

rabbitmq-plugins directories -s 用于查看plugins可安装的目录 (以下示例,要进入容器执行)

延迟队列-及存在的问题和解决方案

安装插件

下载rabbitmq_delayed_message_exchange-<version>.ez文件,进入 docker 容器,将文件放到plugins目录下,此目录为:/opt/bitnami/rabbitmq/plugins。

然后执行安装插件的命令:

延迟队列-及存在的问题和解决方案

如果不是用docker容器运行的,则可以直接将*.ez文件放到rabbitmq的插件目录下为:/usr/lib/rabbitmq/rabbitmq-server-<version>/plugins。

然后就可以执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange。

验证插件

安装插件后,再次查看ui端口的交换机创建处,可以见到新的交换机类型:x-delayed-message .

延迟队列-及存在的问题和解决方案

测试发送延迟消息(Java项目)

开发生产者

  • 声明延迟交换机,x-delay-message 。
  • 声明普通队列。
  • 声明延迟交换机与普通队列的绑定关系。
  • 发送延迟消息,通过设置x-delay,单位毫秒。先发送一个延迟长的消息如1分钟。再发送一个延迟少的消息,如6秒。测试必须要先收到延迟少的消息,就算是延迟交换机运行成功。

package wj.rabbitmq.delayexchange ;
import com.rabbitmq.client.AMQP ;
import com.rabbitmq.client.Channel ;
import com.rabbitmq.client.Connection ;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
import java.nio.charset.StandardCharsets ;
import java.util. HashMap ;
import java.util.Map ;
/**
* <pre>
* 使用延迟交换机发送延迟消息
* 发送的消息会被阻塞到延迟交换机中,先到期的消息会先被发送到队列中,并且不再设置过期时间
* 发送的队列的消息的,立刻会被消息
* </pre>
*/
@Slf4j
public class DelayExchangeSender {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//声明延迟交换机,即指定个x-delayed-message的交换机的基础类型
Map < String , Object > map = new HashMap<>();
map .put( “x-delayed-type” , “direct” ); //必须写,固定值
channel .exchangeDeclare(
“my-delayed-exchange” , //指定交换机的名称,任意
“x-delayed-message” , //指定交换机类型,必须是这个类型
true , //是否持久化
false , //是否自动删除
map ); //必须传入这个参数
//声明普通队列
channel .queueDeclare( “my-queue” ,
true , false ,
false , null );
//声明绑定关系
channel .queueBind( “my-queue” ,
“my-delayed-exchange” ,
“delay-routing” );
//先发送延迟60秒的消息,其中x-delay为固定key
AMQP . BasicProperties prop = new AMQP .BasicProperties()
.builder().headers(
Map . of ( “x-delay” , 1000 * 60 )) //设置延迟时间为60秒
.build();
channel .basicPublish( “my-delayed-exchange” ,
“delay-routing” ,
prop,
“Delay for 1 minutes” .getBytes( StandardCharsets . UTF_8 ));
log .info( “发送消息:[Delay for 1 minutes] 完成” );
//再发送一个延迟时间少的信息,如6秒
prop = new AMQP .BasicProperties()
.builder().headers(
Map . of ( “x-delay” , 1000 * 6 ))
.build();
channel .basicPublish( “my-delayed-exchange” ,
“delay-routing” ,
prop,
“Delay for 6 seconds” .getBytes( StandardCharsets . UTF_8 ));
log .info( “发送消息: [Delay for 6 seconds] 完成” );
}
}

如果先启动生产者,并已经声明了交换机与队列的绑定关系,则消费者可以不再次声明绑定关系。所以,消费者的代码就变的比较简单了,消费者仅是消费消息,并输入消费的时间:

开发消息者

package wj.rabbitmq.delayexchange ;
import com.rabbitmq.client. *;
import lombok.extern.slf4j. Slf4j ;
import wj.mq.utils.ConnUtils ;
/**
* 延迟消息消费者
*/
@Slf4j
public class DelayExchangeReceiver {
public static void main ( String [] args) throws Exception {
Connection con = ConnUtils . newConnection ();
Channel channel = con .createChannel();
//直接消费这个队列即可
channel .basicConsume( “my-queue” , true ,
(consumerTag, message) -> {
log .info( “消费消息:{}” , new String(message.getBody()));
}, consumerTag -> {
//ignore
});
}
}

启动

先启动生产者,发送信息,输出一下时间:

12:35:03.772 发送消息:[Delay for 1 minutes] 完成

12:35:03.774 发送消息: [Delay for 6 seconds] 完成

快速启动消费者,等待,并查看收到消息:

12:35:09.782 消费消息:Delay for 6 seconds

12:36:03.776 消费消息:Delay for 1 minutes

通过上面的代码,可以看出,消费者在6秒后,先收到了先延迟到期的消息,一分钟以后,再收到延迟一分钟的消息。虽然,先发送的是延迟一分钟的消息,但此消息,并没有阻塞后面延迟时间短的消息。

查看延迟交换机

可以看到rate out,此时间将会是一个倒计时,到时间以后再给发送给队列。

在没有收以消息之前,消息会被阻塞到交换机中,所以,没有到延迟时间的消息,不会被发送的队列中。

测试发送延迟消息( spring Boot项目)

使用springboot项目,声明延迟交换机,需要使用CustumExchange自定义交换机类型。

开发配置类

  • 定义延迟交换机。
  • 定义普通队列。
  • 定义绑定关系。

package wj.mq.config.delay;

import java.util.Map;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.CustomExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context. annotation .Bean;

import org.springframework.context.annotation.Configuration;

/**

* 使用rabbitmq_delayed_message_exchange发送延迟消息

*/

@Configuration

public class DelayMessageExchangeConfig {

@Bean

public CustomExchange delayExchange() {

CustomExchange exchange = //

new CustomExchange( “my-delayed-exchange” , // 交换机名称

“x-delayed-message” , // 交换机的类型,必须是此值

true , // durable

false , // autoDelete

Map. of ( “x-delayed-type” , “direct” ) // 传递延迟类型

);

return exchange ;

}

@Bean

public Queue myQueue() {

Queue queue = new Queue( “my-queue” , //队列名称

true , //durable

false , //exclusive

false ); //auto delete

return queue ;

}

@Bean

public Binding myBinding(CustomExchange customExchange , Queue myQueue ) {

Binding binding = BindingBuilder. bind ( myQueue ) //Queue

.to( customExchange ) //exchange

.with( “my-routing” ) //路由routing

.noargs();

return binding ;

}

}

开发生产者代码

  • 注入RabbitTemplate
  • 发送,并设置过期时间。
  • 发送信息,使用ApplicationRunner启动后即发送,也可以开发一个Conntroller,动态调用。

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j

@Component

public class DelaySender implements ApplicationRunner {

@Autowired

private RabbitTemplate rabbitTemplate ;

public void send() {

// 先发送一个延迟1分钟的消息

rabbitTemplate .convertAndSend( “my-delayed-exchange” , // 交换机名称

“my-routing” , // 路由key

“Message of delay 1 minutes” , // 消息对象

new MessagePostProcessor() { // 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message ) //

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,60秒钟

message .getMessageProperties().setDelay(1000 * 60);

return message ;

}

});

log .info( “发送延迟1分钟的消息-完成” );

// 再发送一个延迟6秒钟的消息

rabbitTemplate .convertAndSend( “my-delayed-exchange” , // 交换机名称

“my-routing” , // 路由key

“Message of delay 6 seconds” , // 消息对象

new MessagePostProcessor() { // 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message ) //

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,6秒钟

message .getMessageProperties().setDelay(1000 * 6);

return message ;

}

});

log .info( “发送延迟6秒的消息-完成” );

}

@Override

public void run(ApplicationArguments args ) throws Exception {

send(); //启动完成后就发送,也可以通过一个controller调用测试

}

}

开发消息者代码

通过@RabblitListener接收消息

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

/**

* 消费延迟消息

*/

@Slf4j

@Component

public class DelayReceiver {

@RabbitListener (queues = “my-queue” , ackMode = “AUTO” )

public void cousumer( @Payload () Message message , Channel channel ) {

log .info( “接收到消息:{}” , new String( message .getBody()));

}

}

运行测试

通过以下输出可以看出,虽然先发送的延迟1分钟的消息,但由于后发出的消息因延迟时间更少(6秒)而被先接收到。说明这个测试已经成功运行。

2022-10-01 21:28:55.408 INFO : 发送延迟1分钟的消息-完成

2022-10-01 21:28:55.417 INFO : 发送延迟6秒的消息-完成

2022-10-01 21:29:01.433 INFO : 接收到消息:Message of delay 6 seconds

2022-10-01 21:29:55.408 INFO : 接收到消息:Message of delay 1 minutes

延迟插件的运行原理

  • 延迟交换机,会将收到信息,先缓存到延迟交换机内部。
  • 消息延迟到期后,才会发送给目标队列。

延迟插件窗口镜像自定义

由于每一次启动延迟插件,需要将rabbitmq_delayed_message_exchange-<version>.ez copy到bitnami/rabbitmq容器的目录: /opt/bitnami/rabbitmq/plugins/下。且如果容器删除后,必须重新copy。

可以创建一个新的镜像,将插件copy到镜像中,在启动时通过RABBITMQ_PLUGINS指定启动的插件,就可以了。

创建Dockerfile

创建新的镜像

from bitnami/rabbitmq:3.10.8

MAINTAINER WJ

COPY rabbitmq_delayed_message_exchange-3.10.2.ez /opt/bitnami/rabbitmq/plugins/

构建新的镜像

docker build -t rabbitmq:1.0 .

启动新的容器

并指定启用的插件。

#!/bin/bash

docker stop mq

docker rm mq

docker run –name mq -d

-p 5672:5672

-p 15672:15672

-e RABBITMQ_USERNAME=admin

-e RABBITMQ_PASSWORD=admin

-e TZ=Asia/Shanghai

-e RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream,rabbitmq_delayed_message_exchange

-v ${PWD}/data:/bitnami

mq:1.0

进入容器查看

可见,指定的插件,已经启用。

通过界面查看:

文章来源:智云一二三科技

文章标题:延迟队列-及存在的问题和解决方案

文章地址:https://www.zhihuclub.com/194967.shtml

关于作者: 智云科技

热门文章

网站地图