RabbitMQ功能实现1- 红包未领取退回

生产者代码:

package main

import (
    uuid "github.com/satori/go.uuid"
    "github.com/streadway/amqp"
    "github.com/wonderivan/logger"
    "rmq/db/rmq"
    "time"
)

const (
    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列

    QueueName    = "queue_packet"    // 目标队列
    ExchangeName = "exchange_packet" // 目标交换机
)

var (
    ch       *amqp.Channel
    err      error
    conn     *amqp.Connection
    queue    amqp.Queue
    dlxQueue amqp.Queue
)

func main() {

    if conn, err = rmq.GetConn(); err != nil {
        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
        return
    }

    defer conn.Close()

    if ch, err = conn.Channel(); err != nil {
        logger.Error("获取Channel失败:%s", err.Error())
        return
    }

    defer ch.Close()

    // 声明队列交换机
    if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
        logger.Error("声明业务交换机失败:%s", err.Error())
        return
    }

    // 创建死信交换机
    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        logger.Error("创建死信交换机:%s", err.Error())
        return
    }

    // 创建死信队列
    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
        logger.Error("创建死信队列失败:%s", err.Error())
        return
    }

    // 创建业务队列
    if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{
        "x-message-ttl":          6000,                    // 消息过期时间 毫秒
        "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机
        // "x-dead-letter-routing-key": "dlxKey",       // 死信路由key
    }); err != nil {
        logger.Warn("创建业务队列失败:%s", err.Error())
        return
    }

    // 业务队列绑定交换机
    if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {
        logger.Error("绑定业务交换机失败:%s", err.Error())
        return
    }

    // 死信队列绑定死信交换机
    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
        logger.Error("绑定死信交换机失败:%s", err.Error())
    }

    for i := 1; i <= 10; i++ {
        msg := amqp.Publishing{
            MessageId:   uuid.NewV4().String(),
            ContentType: "text/plain",
            Body:        []byte("红包退回"),
        }

        // 发布消息
        err = ch.Publish(
            ExchangeName,
            "",
            false,
            false,
            msg,
        )

        if err != nil {
            logger.Error("发送失败: %s", err.Error())
            return
        } else {
            logger.Info("发送成功:%s", msg.MessageId)
        }
    }
}

消费者代码

package main

import (
    uuid "github.com/satori/go.uuid"
    "github.com/streadway/amqp"
    "github.com/wonderivan/logger"
    "rmq/db/rmq"
    "time"
)

const (
    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列

    QueueName    = "queue_packet"    // 目标队列
    ExchangeName = "exchange_packet" // 目标交换机
)

var (
    ch       *amqp.Channel
    err      error
    conn     *amqp.Connection
    queue    amqp.Queue
    dlxQueue amqp.Queue
)


func main() {
    if conn, err = rmq.GetConn(); err != nil {
        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
        return
    }

    defer conn.Close()

    if ch, err = conn.Channel(); err != nil {
        logger.Error("获取Channel失败:%s", err.Error())
        return
    }

    defer ch.Close()

    // 创建死信交换机
    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        logger.Error("创建死信交换机:%s", err.Error())
        return
    }

    // 创建死信队列
    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
        logger.Error("创建死信队列失败:%s", err.Error())
        return
    }

    // 死信队列绑定死信交换机
    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
        logger.Error("绑定死信交换机失败:%s", err.Error())
    }

    msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil)
    if err != nil {
        logger.Error("消费者监听失败:%s", err.Error())
        return
    }

    for {
        select {
        case message, ok := <-msgList:
            if !ok {
                continue
            }

            go func(msg amqp.Delivery) {
                logger.Info("messageID: %s", msg.MessageId)
                logger.Info("messageBody: %s", msg.Body)
                if err = msg.Ack(false); err != nil {
                    logger.Error("确认消息失败")
                }
            }(message)
        case <-time.After(time.Second):

        }
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注