反应器设计模式(Reactor Pattern)
反应器设计模式(Reactor pattern)是一种处理一个或多个客户端并发交付服务请求的事件设计模式。
Reactor模式被广泛应用在设计高性能 io 方面上,大多数IO相关组件如 Netty、Redis就都在使用的该设计模式。
当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
基于NIO的三种Reactor模式
1、Reactor单线程模式
2、Reactor 多线程 模式
3、主从Reactor多线程模式
代码案例
1、Reactor单线程模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio. channel s.SocketChannel;
import java.util. iterator ;
import java.util.Set;
/**
* 单线程的Reactor模式
*/
public class NioServer {
public static void main(String[] args) {
try {
Selector selector = Selector. open ();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 非阻塞
server SocketChannel.configureBlocking( false );
serverSocketChannel.bind(new InetSocketAddress(9090));
// 服务器端注册: accept 事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server is open!");
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer. allocate (1024);
int len = 0;
// 当管道的数据都读取完毕了
while ((len = (socketChannel.read(byteBuffer))) > 0) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, len));
byteBuffer.clear();
}
} else if (selectionKey.isAcceptable()) {
// 第一次链接到server,需要构建一个通道
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey
.channel();
// 开通通道
SocketChannel socketChannel = acceptServerSocketChannel.accept();
// 设置为非堵塞
socketChannel.configureBlocking(false);
// 注册可读的监听事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("[server]接收到新的链接");
}
iterator.remove();
}
}
}
} catch (IOException e) {
System.err.printf("[server]异常出现,信息为{}", e);
}
}
}
2、Reactor多线程模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 多线程的Reactor模式
*/
public class NioServer {
/**
* @param args
*/
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("[server]开始启动服务器");
while (true) {
if (selector.selectNow() < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
Processor processor = (Processor) selectionKey.attachment();
processor.process(selectionKey);
} else if (selectionKey.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
//绑定处理器线程
key.attach(new Processor());
System.out.println("[server]接收到新的链接");
}
iterator.remove();
}
}
} catch (IOException e) {
System.err.printf("[server]异常出现,信息为{}", e);
}
}
}
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 处理器
*/
public class Processor {
private static final ExecutorService service = new ThreadPoolExecutor(16, 16, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public void process(SelectionKey selectionKey) {
service.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
System.out.println("读取结束!");
return null;
} else if (count == 0) {
return null;
}
System.out.println("读取内容:" + new String(buffer.array()));
return null;
});
}
}
3、主从Reactor多线程模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 主从Reactor多线程模式
*/
public class NioServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
// 初始化通道,标志为accept类型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int index = 0;
// 一直处于堵塞的状态
while (selector.select() > 0) {
// 获取到selectionkey的集合
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
Processor processor = processors[(int) ((index++) / coreNum)];
processor.addChannel(socketChannel);
}
iterator.remove();
}
}
}
}
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Processor {
private static final ExecutorService service = Executors
.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public Processor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void start() {
service.submit(() -> {
while (true) {
if (selector.selectNow() <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
System.out.println("读取结束" + socketChannel);
continue;
} else if (count == 0) {
System.out.println("客户端信息大小:" + socketChannel);
continue;
} else {
System.out.println("客户端信息:" + new String(buffer.array()));
}
}
}
}
});
}
}