您的位置 首页 golang

RabbitMQ功能实现1- 红包未领取退回


生产者代码:

package mainimport (    uuid "github.com/satori/go.uuid"    "github.com/streadway/amqp"    "github.com/wonderivan/logger"    "rmq/db/rmq"    "time")const (    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列    QueueName    = "queue_packet"    // 目标队列    ExchangeName = "exchange_packet" // 目标交换机)var (    ch       *amqp.Channel    err      error    conn     *amqp.Connection    queue    amqp.Queue    dlxQueue amqp.Queue)func main() {    if conn, err = rmq.GetConn(); err != nil {        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())        return    }    defer conn.Close()    if ch, err = conn.Channel(); err != nil {        logger.Error("获取Channel失败:%s", err.Error())        return    }    defer ch.Close()    // 声明队列交换机    if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {        logger.Error("声明业务交换机失败:%s", err.Error())        return    }    // 创建死信交换机    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {        logger.Error("创建死信交换机:%s", err.Error())        return    }    // 创建死信队列    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {        logger.Error("创建死信队列失败:%s", err.Error())        return    }    // 创建业务队列    if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{        "x-message-ttl":          6000,                    // 消息过期时间 毫秒        "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机        // "x-dead-letter-routing-key": "dlxKey",       // 死信路由key    }); err != nil {        logger.Warn("创建业务队列失败:%s", err.Error())        return    }    // 业务队列绑定交换机    if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {        logger.Error("绑定业务交换机失败:%s", err.Error())        return    }    // 死信队列绑定死信交换机    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {        logger.Error("绑定死信交换机失败:%s", err.Error())    }    for i := 1; i <= 10; i++ {        msg := amqp.Publishing{            MessageId:   uuid.NewV4().String(),            ContentType: "text/plain",            Body:        []byte("红包退回"),        }        // 发布消息        err = ch.Publish(            ExchangeName,            "",            false,            false,            msg,        )        if err != nil {            logger.Error("发送失败: %s", err.Error())            return        } else {            logger.Info("发送成功:%s", msg.MessageId)        }    }}

消费者代码

package mainimport (    uuid "github.com/satori/go.uuid"    "github.com/streadway/amqp"    "github.com/wonderivan/logger"    "rmq/db/rmq"    "time")const (    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列    QueueName    = "queue_packet"    // 目标队列    ExchangeName = "exchange_packet" // 目标交换机)var (    ch       *amqp.Channel    err      error    conn     *amqp.Connection    queue    amqp.Queue    dlxQueue amqp.Queue)func main() {    if conn, err = rmq.GetConn(); err != nil {        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())        return    }    defer conn.Close()    if ch, err = conn.Channel(); err != nil {        logger.Error("获取Channel失败:%s", err.Error())        return    }    defer ch.Close()    // 创建死信交换机    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {        logger.Error("创建死信交换机:%s", err.Error())        return    }    // 创建死信队列    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {        logger.Error("创建死信队列失败:%s", err.Error())        return    }    // 死信队列绑定死信交换机    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {        logger.Error("绑定死信交换机失败:%s", err.Error())    }    msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil)    if err != nil {        logger.Error("消费者监听失败:%s", err.Error())        return    }    for {        select {        case message, ok := <-msgList:            if !ok {                continue            }            go func(msg amqp.Delivery) {                logger.Info("messageID: %s", msg.MessageId)                logger.Info("messageBody: %s", msg.Body)                if err = msg.Ack(false); err != nil {                    logger.Error("确认消息失败")                }            }(message)        case <-time.After(time.Second):        }    }}

文章来源:智云一二三科技

文章标题:RabbitMQ功能实现1- 红包未领取退回

文章地址:https://www.zhihuclub.com/7462.shtml

关于作者: 智云科技

热门文章

网站地图