生产者代码:
package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 声明队列交换机 if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil { logger.Error("声明业务交换机失败:%s", err.Error()) return } // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 创建业务队列 if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{ "x-message-ttl": 6000, // 消息过期时间 毫秒 "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机 // "x-dead-letter-routing-key": "dlxKey", // 死信路由key }); err != nil { logger.Warn("创建业务队列失败:%s", err.Error()) return } // 业务队列绑定交换机 if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil { logger.Error("绑定业务交换机失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } for i := 1; i <= 10; i++ { msg := amqp.Publishing{ MessageId: uuid.NewV4().String(), ContentType: "text/plain", Body: []byte("红包退回"), } // 发布消息 err = ch.Publish( ExchangeName, "", false, false, msg, ) if err != nil { logger.Error("发送失败: %s", err.Error()) return } else { logger.Info("发送成功:%s", msg.MessageId) } }}
消费者代码
package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil) if err != nil { logger.Error("消费者监听失败:%s", err.Error()) return } for { select { case message, ok := <-msgList: if !ok { continue } go func(msg amqp.Delivery) { logger.Info("messageID: %s", msg.MessageId) logger.Info("messageBody: %s", msg.Body) if err = msg.Ack(false); err != nil { logger.Error("确认消息失败") } }(message) case <-time.After(time.Second): } }}
文章来源:智云一二三科技
文章标题:RabbitMQ功能实现1- 红包未领取退回
文章地址:https://www.zhihuclub.com/7462.shtml