您的位置 首页 golang

高级消息队列 RabbitMQ 快速入门

1 什么是消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用 耦合 ,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。通俗的来讲,消息队列就是生产者生产消息,消费者监听到消息做各自的业务操作,也就是消费消息的过程。

2下载与安装

2.1安装 Erlang (由于 RabbitMq 是基于erlang的)

RabbitMQ和Erlang的对应关系 rabbitmq .com/which-erlang.html Erlang下载地址:

安装过程简单粗暴,一直next就行

2.2安装RabbitMQ

下载地址:

安装:点next就行

2.3配置RabbitMq

2.3.1执行以下命令

启用WEB管理插件 rabbitmq-plugins enable rabbitmq_management

2.3.2访问

2.3.3进入首页

用户名密码guest/guest

3 RabbitMQ 的工作原理

1、生产者发送/发布消息到代理; 2、消费者从代理那里接收消息。RabbitMQ扮演代理中间件的角色; 3、当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用 交换机 来发送; 4、交换机把消息分发到不同的队列里,消费者就能从监听的队列中消费消息。

4六种消息模型

4.1简单模式(simple)

1.消息产生着将消息放入队列; 2.消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被消费后,自动从队列中删除; 3.缺点:这种模式下消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。 4.应用场景:客户端服务端模式的聊天程序。

5.代码 (1)先定义一个简单的队列存储消息

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:25
 * @description:
 * @modified By:
 * @version: $
 */
@Configuration
public class SimpleQueue {

    /**
     * 创建一个简单的队列,叫hello
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue("hello");
    }
}
  

(2)定义消费者

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:25
 * @description:消费者,监听hello队列
 * @modified By:
 * @version: $
 */
@Component
public class Consumer1 {

    @RabbitListener(queues = "hello")
    @RabbitHandler
    public void receive(String msg){
        System.out.println("Consumer1收到消息:"+msg);
    }
}
  

(3)定义生产者

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:27
 * @description:这里直接用 Rest 接口来做生产者
 * @modified By:
 * @version: $
 */
@RestController
@RequestMapping("/")
public class RabbitRestController {

    @Autowired
    @Qualifier("fireRabbitTemplate")
     private  RabbitTemplate rabbitTemplate;

    private final Logger log =LoggerFactory.getLogger(getClass());

    @RequestMapping("/send")
    public String send() {
        String context = "hello==========" + new Date();
        log.info("发送消息 : " + context);
        //生产者,正在往hello这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称
        this.rabbitTemplate.convertAndSend("hello", context);
        return "success";
    }
}
  

(4)前端发送Rest请求,看控制台效果

4.2工作模式(work)

1.消息产生者将消息放入队列。生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢; 2.消费者A,消费者B,当然可以更多,同时监听同一个队列,消费者共同争抢当前的消息队列内容,谁先拿到谁负责消费消息; 3.缺点:高并发情况下,会产生某一个消息被多个消费者共同消费。 4.应用场景:发红包

5.代码 work模式我们只需要在简单模式的基础上添加一个消费者,也监听hello这个队列 (1)添加消费者2

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:29
 * @description:
 * @modified By:
 * @version: $
 */
@Component
public class Consumer2 {

    @RabbitHandler
    @RabbitListener(queues = "hello")
    public void receive(String msg){
        System.out.println("Consumer2收到消息:"+msg);
    }
}
  

(2)前端发送Rest请求,看控制台效果 发送四次请求,消费者1和2分别接收到两次

4.3发布订阅模式(publish/subscribe)

1.X代表交换机rabbitMQ内部组件,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中; 2.消费者监听队列,对应消息队列的消费者拿到消息进行消费; 3.相关场景:邮件群发,群聊天。

4.代码 (1)定义交换机与队列

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:47
 * @description:交换机-发布订阅模式
 * @modified By:
 * @version: $
 */
@Configuration
public class QueueExchange {

    @Bean
    public Queue queueA() {
        return new Queue("queueA", true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("queueB", true);
    }

    /**
     * 创建一个fanoutExchange交换机
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 将queueA队列绑定到fanoutExchange交换机上面
     */
    @Bean
    Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    /**
     * 将queueB队列绑定到fanoutExchange交换机上面
     */
    @Bean
    Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }

}
  

(2)定义消费者

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:15
 * @description:消费者3
 * @modified By:
 * @version: $
 */
@Component
public class Consumer3 {

    @RabbitHandler
    @RabbitListener(queues = "queueA")
    public void receive(String msg){
        System.out.println("Consumer3收到消息:"+msg);
    }
}
  
 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:15
 * @description:消费者4
 * @modified By:
 * @version: $
 */
@Component
public class Consumer4 {

    @RabbitHandler
    @RabbitListener(queues = "queueB")
    public void receive(String msg){
        System.out.println("Consumer4收到消息:"+msg);
    }
}
  

(3)定义生产者

 @RequestMapping("/sendExchange")
public String sendToExchange(){
    String context = "exchange=======" + new Date();
    log.info("发送消息 : " + context);
    //生产者,正在往交换机发送消息,交换机会根据绑定的队列来发送(如果多个客户端监听同一个队列,只有一个能收到消息)
    this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    return "success";
}
  

(4)前端发送Rest请求,看控制台效果 这里消费者3和4都收到了消息,因为他们分别监听不同的两个队列

4.4 路由模式 (routing)

1.消息生产者将消息发送给交换机按照路由判断,路由是字符串,交换机根据路由的key去匹配,只有匹配上路由key对应的消息队列,对应的消费者才能消费消息; 2.根据业务功能定义路由字符串; 3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中; 4.业务场景:统一门户和子系统交互,每个子系统对应不同的业务处理,通过路由key分发不同的消息到对应子系统队列,完成消息消费。 5.代码 (1)定义交换机与队列,绑定路由key

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 17:06
 * @description:路由模式
 * @modified By:
 * @version: $
 */
@Configuration
public class QueueRouter {

    public static final String DIRECT_EXCHANGE = "directExchange";
    public static final String QUEUE_DIRECT_A = "direct.A";

    public static final String QUEUE_DIRECT_B = "direct.B";

    /**
     * 创建一个direct交换机
     * @return
     */
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    Queue queueDirectNameA() {
        return new Queue(QUEUE_DIRECT_A);
    }


    /**
     * 创建队列
     * @return
     */
    @Bean
    Queue queueDirectNameB() {
        return new Queue(QUEUE_DIRECT_B);
    }


    /**
     * 将direct.A队列绑定到directExchange交换机中,使用a.key作为路由规则
     * @param queueDirectNameA
     * @param directExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
        return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("a.key");
    }

    /**
     * 将direct.B队列绑定到directExchange交换机中,使用b.key作为路由规则
     * @param queueDirectNameB
     * @param directExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
        return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("b.key");
    }
  

(2)定义消费者

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:15
 * @description:
 * @modified By:
 * @version: $
 */
@Component
public class Consumer5 {

    @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
    @RabbitHandler
    public void receiveA(String msg){
        System.out.println("Consumer5-direct-A收到路由消息:"+msg);
    }

    @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_B)
    @RabbitHandler
    public void receiveB(String msg){
        System.out.println("Consumer5-direct-B收到路由消息:"+msg);
    }
}
  

(3)定义生产者

 @RequestMapping("/sendRouter")
public String sendToExchangeByRouter(){
    String context = "exchange=======" + new Date();
    log.info("发送路由消息 : " + context);
    //生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用a.key作为路由规则来发送
    this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
    return "success";
}
  

(4)发送前端请求,看效果 可以看到只有监听了QUEUE DIRECT A的消费者能收到消息,因为队列A使用的路由key为a.key

4.5主题模式(topic)

注意:与路由模式的区别就是路由key可以是通配符,模糊匹配。交换机类型为topic 1.星号井号代表通配符; 2.星号代表多个单词, 井号 代表一个单词; 3.路由功能添加模糊匹配; 4.消息产生者产生消息,把消息交给交换机; 5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。 5.代码 (1)定义队列与topic交换机

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 17:06
 * @description:主题模式
 * @modified By:
 * @version: $
 */
@Configuration
public class QueueTopic {

    public static final String TOPIC_EXCHANGE = "topicExchange";

    public static final String DIRECT_REGXA = "nr.topic.#";
    public static final String DIRECT_REGXB = "nr.topic.b";
    public static final String DIRECT_REGXC = "nr.topic.c";

    public static final String QUEUE_TOPIC_A = "topic.A";

    public static final String QUEUE_TOPIC_B = "topic.B";

    public static final String QUEUE_TOPIC_C = "topic.C";

    /**
     * 创建一个topic交换机
     * @return
     */
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }



    /**
     * 创建队列
     * @return
     */
    @Bean
    Queue queueTopicNameA() {
        return new Queue(QUEUE_TOPIC_A);
    }


    @Bean
    Queue queueTopicNameB() {
        return new Queue(QUEUE_TOPIC_B);
    }

    @Bean
    Queue queueTopicNameC() {
        return new Queue(QUEUE_TOPIC_C);
    }


    /**
     * 将direct.A队列绑定到topicExchange交换机中,使用nr.topic.#作为路由规则
     * @param queueTopicNameA
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(DIRECT_REGXA);
    }

    /**
     * 将direct.B队列绑定到topicExchange交换机中,使用nr.topic.b作为路由规则
     * @param queueTopicNameB
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(DIRECT_REGXB);
    }

    /**
     * 将direct.B队列绑定到topicExchange交换机中,使用nr.topic.c作为路由规则
     * @param queueTopicNameC
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicC(Queue queueTopicNameC, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameC).to(topicExchange).with(DIRECT_REGXC);
    }
}
  

(2)定义消费者

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:15
 * @description:
 * @modified By:
 * @version: $
 */
@Component
public class Consumer6 {

    @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_A)
    @RabbitHandler
    public void receiveA(String msg){
        System.out.println("Consumer6-topic-A收到路由消息:"+msg);
    }

    @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_B)
    @RabbitHandler
    public void receiveB(String msg){
        System.out.println("Consumer6-topic-B收到路由消息:"+msg);
    }

    @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_C)
    @RabbitHandler
    public void receiveC(String msg){
        System.out.println("Consumer6-topic-C收到路由消息:"+msg);
    }

}
  

(3)定义生产者

 @RequestMapping("/sendTopic")
public String sendToExchangeByTopic(){
    String context = "topic=======" + new Date();
    log.info("发送topic消息 : " + context);
    //生产者,正在往topic交换机发送消息,队列绑定了不同路由规则,交换机会使用nr.topic.b作为路由规则来发送
    // 用nr.topic.b和nr.topic.#作为路由key的队列都能收到消息
    this.rabbitTemplate.convertAndSend(QueueTopic.TOPIC_EXCHANGE,"nr.topic.b", context);
    return "success";
}
  

(4)前端发送Rest请求,看效果 和预期一样,A和B都收到了消息

4.6 RPC模式

注:图片来源于官网

  1. 客户端启动时,它将创建一个匿名排他回调队列。
  2. 对于RPC请求,客户端发送一条消息,该消息具有两个属性: reply to(设置为回 调队列)和correlation id(设置为每个请求的唯一值)。
  3. 求被发送到rpc_queue队列。
  4. RPC工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它将使用reply_to字段中的队列来完成工作,并将消息和结果发送回客户端。
  5. 客户端等待回调队列上的数据。当出现一条消息时,它将检查correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。 注:此处来源于官网,这里只做简单介绍,详情可看官网

5消息确认机制(ACK)

业务系统中,消息丢了怎么办,消息发送到哪了?我们通常需要一些消息补偿机制去处理这些问题。

消息确认分为两种,发送确认和接收确认

5.1消息发送确认

确认生产者将消息发送给交换机,交换机传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列 (1)通过实现ConfirmCallBack接口确认消息发送到交换机 代码:

         /**
         * 如果消息到达交换机, 则 confirm 回调, ack = true
         * 如果消息不到达交换机, 则 confirm 回调, ack = false
         * 需要设置spring.rabbitmq.publisher-confirm-type=correlated
         */
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
            log.info("收到回调:{}", ack == true ? "消息成功到达交换机" : "消息到达交换机失败");
            if (!ack) {
                log.info("correlationData:{}", correlationData.getId());
                log.info("消息到达交换机失败原因:{}", cause);
                // 根据业务逻辑实现消息补偿机制
            }
        });
  

(2)通过实现ReturnCallback接口确认消息从交换机发送到队列 代码:

          /**
         * 消息从交换机到达队列成功, 则 returnedMessage 不回调
         * 消息从交换机到达队列失败, 则 returnedMessage 回调
         * 需要设置spring.rabbitmq.publisher-returns=true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage->{
            log.info("消息未到达队列,setReturnsCallback回调");
            log.info("消息报文:{}", new String(returnedMessage.getMessage().getBody()));
            log.info("消息编号:{}", returnedMessage.getReplyCode());
            log.info("描述:{}", returnedMessage.getReplyText());
            log.info("交换机名称:{}", returnedMessage.getExchange());
            log.info("路由名称:{}", returnedMessage.getRoutingKey());
            // 根据业务逻辑实现消息补偿机制
        });
  

5.1消息接收确认

(1)确认模式

AcknowledgeMode.NONE:不确认 AcknowledgeMode.AUTO:自动确认 AcknowledgeMode.MANUAL:手动确认 需要配置:

 spring.rabbitmq.listener.simple.acknowledge-mode = manual
  

(2)代码: 消费者确认

 /**
 * @author :Mr.Fire
 * @date :Created in 2021/4/25 16:15
 * @description:
 * @modified By:
 * @version: $
 */
@Component
public class Consumer8 {

    @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
    @RabbitHandler
    public void receiveA(Message msg, Channel channel) throws IOException {
        try {
            //消息确认机制还可以起到限流作用,比如在接收到此条消息时休眠几秒钟
            Thread.sleep(3000);
            // 确认收到消息,消息将被队列移除
            // false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Consumer8-direct-A收到确认消息:"+msg);
    }
 }
  

生产者

      @RequestMapping("/sendAck")
    @ResponseBody
    public String sendAck() {
        String context = "exchange=======" + new Date();
        log.info("发送确认消息 : " + context);
        //生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用a.key作为路由规则来发送
        this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
        return "success";
    }
  

前端请求看效果:

当然还有其他,比如失败确认basicNack,拒绝basicReject,basicPublish重新发布等,以后有机会在讲解…

6 Spring Boot 整合 RabbitMQ

1.新建一个Maven工程

勾选Spring Web和RabbitMq的依赖,也可以建好工程自己添加

创建完成,来看看POM文件依赖

 <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  

2.配置yml或者properties,我这里使用properties

     server.port= 8090
    spring.rabbitmq.addresses=localhost:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    #消息确认需要配置
    spring.rabbitmq.publisher-confirm-type=correlated
    spring.rabbitmq.publisher-returns=true
    #手动确认消息
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
  

3.启动项目

4.到此,已经可以开始开发你自己的业务逻辑了

7扩展面试思考题

1.消息基于什么传输? 2.如何避免消息重复投递或重复消费? 3.如何保证消息不丢失? 4.手动确认模式中,消息手动拒绝中如果requeue为true会重新放入队列,消费者处理过程中一直有异常情况下会导致入队-拒绝-入队的死循环,怎么处理?

文章来源:智云一二三科技

文章标题:高级消息队列 RabbitMQ 快速入门

文章地址:https://www.zhihuclub.com/100466.shtml

关于作者: 智云科技

热门文章

网站地图