您的位置 首页 java

Java消息队列之RabbitMQ消息可靠性

那些失败的情况

网络问题有很多原因出发失败。防火墙也可能会中断 idle 连接,网络失败不是很快确定的。 硬件和软件也会导致系统崩溃。客户端软件保持运行,而逻辑错误也可能会导致channel和connection错误。这就要求我们可以恢复new channel或者connection从这个问题中。

Connection 失败

Connection失败后,客户端需要重新构建一个新的Connection。任何之前打开的Channel会重新打开。会有特定的异常抛出,例如 Java 提供了ShutdownListener和DotNet提供的Iconnection.ConnectionShutdown时间。

确认和提交(Acknowledgements and confirm s)

当连接失败,消息可能在客户端和服务器端之间进行传输。 确认可以让服务器和客户端之间知道彼此的状态。 在 RabbitMQ 中,Consumer与服务器之间确认行为叫做”Acknowledgements”,而Producer与服务器之间确认行为叫做“Confirms”。

Acknowledgements的场景:消费软件不确认这个消息,直到它使用接受到的消息,完成自己的工作。一旦Broker接受到这个消息确认信息,将清除这个消息。

Confirms的场景:当Broker接受到不能路由的消息,并且消息被设置为mandatory,消息basic.return会在basic.ack之前返回。对于路由之后的消息,basic.ack会让所有接受到这个消息的数据,存储到磁盘中。对于镜像队列来说(Mirrored Queue),会让所有镜像都统一确认这些消息。

心跳

部分网络问题,包的丢失可能是由于操作系统破坏了TCP链接。AMQP使用心跳的特征在软件层次检测链接是否被破坏了。心跳也会阻止一些网络设备阻止当前“idle”tcp链接。 broker会协商连接的heartbeat的频率。

对于Producer(消息生产者)

当使用confirm时候,如果channel或者connection失败,Producer应该重新发送所有没有来得及提交的数据。

这里可能会造成数据重复,因为服务器broker可能已经发送确认数据到Producer了。因此consumer应用可以处理重复数据,保持一个 幂等 的状态。(幂等状态是指,无论consumer处理了几次相同的数据,结果都是一样的。例如:x=2是幂等。x=x+2就不是幂等)

确定消息是被路由的

producer应该保证发送的消息被路由( Router )到队列中(Queues). 为了保证producer发送消息可以被最少一条队列接受(Queue),我们在发送消息的时候,必须设置mandatory标示,在发送消息时候(AMQP的协议对应basic.publish,Java的版本对应Channel.basicPublish方法)。当消息没有被队列接受时,我们可以通过返回值(AMQP协议的basic.return)获得一个返回码和其他文本数据。

Producer也应该认识到发送消息到集群中,集群之间都需要复制消息。也增加了网络失败造成的延时情况。

对于Consumer(对于消费者)

如果一个消息发送到Consumer,而Consumer没有确认并且Connection中断,那么这个消息重新进入队列,再次发送出去之后,会有一个redelivedred标签。帮助Consumer的应用判断当前数据是否重复发送。

Consumer取消

Consumer取消是服务器端发起,告诉Consumer当前消费被终止了。Consumer取消发起的原因:当前的Queue被删除,或者其他失败情况。这是Rabbitmq的一个扩展,部分客户端才能支持。(Java和DotNet都支持)。

消息不能被处理

consumer可以发送消息给服务端,告知消息被拒绝(AMQP的basic.reject和basic.nack),让服务器重新发送他们,或者根据dead-letter参数参数。(dead-letter是配置Exchange和Queue时候配置的参数,我们可以进行相关的设置)。

分布式下的RabbitMQ

当网络不稳定的时候,使用federation和shovel来实现功能。我们也可以配置他们实现confirm和acknowledgement功能。

文章来源:智云一二三科技

文章标题:Java消息队列之RabbitMQ消息可靠性

文章地址:https://www.zhihuclub.com/190944.shtml

关于作者: 智云科技

热门文章

网站地图