1. 消息队列解决了什么问题
消息中间件是目前比较流行的一个中间件,其中RabbitMQ更是占有一定的市场份额,主要用来做 异步处理、应用解耦、流量削峰、日志处理 等等方面。
1. 异步处理
一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。
- 串行到依次执行,问题是用户注册后就可以使用了,没必要等验证码跟邮件。
- 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟验证码是非重要的任务,系统注册还要等这俩完成么?
- 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟验证码系统主动去拉取数据。
2. 应用解耦
比如我们有一个订单系统,还要一个库存系统,用户下订单了就要调用下库存系统来处理,直接调用到话库存系统出现问题咋办呢?
3. 流量削峰
举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间搞密度访问绝对不可以起码要加入一个MQ。
4. 日志处理
用户通过WebUI访问发送请求到时候后端如何接受跟处理呢一般?
2. RabbitMQ 安装跟配置
官网:
开发语言:
正式到安装跟允许需要Erlang跟RabbitMQ俩版本之间相互兼容!我这里图省事直接用Docker 拉取镜像了。下载:
开启:管理页面 默认账号:guest 默认密码:guest 。Docker启动时候可以指定账号密码对外端口以及
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
启动:
用户添加:
vitrual hosts 相当于mysql中的DB。创建一个virtual hosts,一般以/ 开头。
对用户进行授权,点击/vhost_mmr,
至于WebUI多点点即可了解。
3. 实战
RabbitMQ 官网支持任务模式:
l创建Maven项目导入必要依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
0. 获取MQ连接
package com.sowhat.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 连接器
* @return
* @throws IOException
* @throws TimeoutException
*/ public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/vhost_mmr");
factory.setUsername("user_mmr");
factory.setPassword("sowhat");
Connection connection = factory.newConnection();
return connection;
}
}
1. 简单队列
P:Producer 消息的生产者 中间:Queue消息队列 C:Consumer 消息的消费者
package com.sowhat.mq.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
// 创建队列声明
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello Simple";
// exchange,队列,参数,消息字节体
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg:" + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.simple;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者获取消息
*/public class Recv {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
newApi();
oldApi();
}
private static void newApi() throws IOException, TimeoutException {
// 创建连接
Connection connection = ConnectionUtils.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 队列声明 队列名,是否持久化,是否独占模式,无消息后是否自动删除,消息携带参数
channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override // 事件模型,消息来了会触发该函数
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("---new api recv:" + s);
}
};
// 监听队列
channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
}
// 老方法 消费者 MQ 在3。4以下 用次方法,
private static void oldApi() throws IOException, TimeoutException, InterruptedException {
// 创建连接
Connection connection = ConnectionUtils.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 定义队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(Send.QUEUE_NAME, true, consumer);
while (true) {
// 发货体
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String s = new String(body);
System.out.println("---Recv:" + s);
}
}
}
右上角有可以设置页面刷新频率,然后可以在UI界面直接手动消费掉,如下图:
简单队列的不足 :耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。
2. WorkQueue 工作队列
Simple队列中只能一一对应的生产消费,实际开发中生产者发消息很简单,而消费者要跟业务结合,消费者接受到消息后要处理从而会耗时。 「可能会出现队列中出现消息积压」 。所以如果多个消费者可以加速消费。
1. round robin 轮询分发
代码编程一个生产者两个消费者:
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 声明队列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
现象:消费者1 跟消费者2 处理的数据量完全一样的个数:消费者1:处理偶数 消费者2:处理奇数 这种方式叫 轮询分发(round-robin) 结果就是不管两个消费者谁忙, 「数据总是你一个我一个」 ,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。
2. 公平分发 fair dipatch
如果要实现 公平分发 ,要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭!
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// s声明队列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只发送一个消息
// 从而限制一次性发送给消费者到消息不得超过1个。
int perfetchCount = 1;
channel.basicQos(perfetchCount);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
结果 :实现了公平分发,消费者2 是消费者1消费数量的2倍。
3. publish/subscribe 发布订阅模式
类似公众号的订阅跟发布,无需指定routingKey:
解读:
- 一个生产者多个消费者
- 每一个消费者都有一个自己的队列
- 生产者没有把消息直接发送到队列而是发送到了 交换机转化器(exchange) 。
- 每一个队列都要绑定到交换机上。
- 生产者发送的消息经过交换机到达队列,从而实现一个消息被多个消费者消费。
生产者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分发= fanout
// 发送消息
String msg = "hello ps ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("Send:" + msg);
channel.close();
connection.close();
}
}
消息哪儿去了?丢失了,在RabbitMQ中只有队列有存储能力, 「因为这个时候队列还没有绑定到交换机 所以消息丢失了」 。消费者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String QUEUE_NAME = "test_queue_fanout_email";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String QUEUE_NAME = "test_queue_fanout_sms";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
「同时还可以自己手动的添加一个队列监控到该exchange」
4. routing 路由选择 通配符模式
Exchange(交换机,转发器): 「一方面接受生产者消息,另一方面是向队列推送消息」 。匿名转发用 “” 表示,比如前面到简单队列跟WorkQueue。 fanout :不处理路由键。 「不需要指定routingKey」 ,我们只需要把队列绑定到交换机, 「消息就会被发送到所有到队列中」 。 direct :处理路由键, 「需要指定routingKey」 ,此时生产者发送数据到时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图
package com.sowhat.mq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello info!";
// 可以指定类型
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 绑定种类似 Key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
WebUI:
缺点 :路由key必须要明确,无法实现规则性模糊匹配。
总结
如果需要指定模式一般是在消费者端设置,灵活性调节。
模式生产者Queue生产者exchange生产者routingKey消费者exchange消费者queueroutingKey Simple(简单模式少用)指定不指定不指定不指定指定不指定 WorkQueue(多个消费者少用)指定不指定不指定不指定指定不指定 fanout(publish/subscribe模式)不指定指定不指定指定指定不指定 direct(路由模式)不指定指定指定指定指定消费者routingKey精确指定多个 topic(主题模糊匹配)不指定指定指定指定指定消费者routingKey可以进行模糊匹配