您的位置 首页 java

Java,事件驱动,Reactor设计模式,NIO的三种实现

反应器设计模式(Reactor Pattern)

反应器设计模式(Reactor pattern)是一种处理一个或多个客户端并发交付服务请求的事件设计模式。

Reactor模式被广泛应用在设计高性能 io 方面上,大多数IO相关组件如 Netty、Redis就都在使用的该设计模式。

当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

基于NIO的三种Reactor模式

1、Reactor单线程模式

2、Reactor 多线程 模式

3、主从Reactor多线程模式

代码案例

1、Reactor单线程模式

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio. channel s.SocketChannel;
import java.util. iterator ;
import java.util.Set;

/**
 * 单线程的Reactor模式
 */
public class NioServer {

	public static void main(String[] args) {
		try {
			Selector selector = Selector. open ();
			ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
			// 非阻塞
			 server SocketChannel.configureBlocking( false );
			serverSocketChannel.bind(new InetSocketAddress(9090));
			// 服务器端注册: accept 事件
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("server is open!");
			while (true) {
				if (selector.select() > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iterator = keys.iterator();
					while (iterator.hasNext()) {
						SelectionKey selectionKey = iterator.next();
						if (selectionKey.isReadable()) {
							SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
							ByteBuffer byteBuffer = ByteBuffer. allocate (1024);
							int len = 0;
							// 当管道的数据都读取完毕了
							while ((len = (socketChannel.read(byteBuffer))) > 0) {
								byteBuffer.flip();
								System.out.println(new String(byteBuffer.array(), 0, len));
								byteBuffer.clear();
							}
						} else if (selectionKey.isAcceptable()) {
							// 第一次链接到server,需要构建一个通道
							ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey
									.channel();
							// 开通通道
							SocketChannel socketChannel = acceptServerSocketChannel.accept();
							// 设置为非堵塞
							socketChannel.configureBlocking(false);
							// 注册可读的监听事件
							socketChannel.register(selector, SelectionKey.OP_READ);
							System.out.println("[server]接收到新的链接");
						}
						iterator.remove();
					}
				}

			}
		} catch (IOException e) {
			System.err.printf("[server]异常出现,信息为{}", e);
		}

	}

}  

2、Reactor多线程模式

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 多线程的Reactor模式
 */
public class NioServer {
 
    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("[server]开始启动服务器");
            while (true) {
                if (selector.selectNow() < 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        Processor processor = (Processor) selectionKey.attachment();
                        processor.process(selectionKey);
                    } else if (selectionKey.isAcceptable()) {
                        ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = acceptServerSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
                        //绑定处理器线程
                        key.attach(new Processor());
                        System.out.println("[server]接收到新的链接");
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
        	System.err.printf("[server]异常出现,信息为{}", e);
        }
 
    }
}  
 import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 处理器
 */
public class Processor {

	private static final ExecutorService service = new ThreadPoolExecutor(16, 16, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>());

	public void process(SelectionKey selectionKey) {
		service.submit(() -> {
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
			int count = socketChannel.read(buffer);
			if (count < 0) {
				socketChannel.close();
				selectionKey.cancel();
				System.out.println("读取结束!");
				return null;
			} else if (count == 0) {
				return null;
			}
			System.out.println("读取内容:" + new String(buffer.array()));
			return null;
		});
	}

}  

3、主从Reactor多线程模式

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 主从Reactor多线程模式
 */
public class NioServer {

	public static void main(String[] args) throws IOException {
		Selector selector = Selector.open();
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.configureBlocking(false);
		serverSocketChannel.bind(new InetSocketAddress(1234));
		// 初始化通道,标志为accept类型
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

		int coreNum = Runtime.getRuntime().availableProcessors();
		Processor[] processors = new Processor[coreNum];
		for (int i = 0; i < processors.length; i++) {
			processors[i] = new Processor();
		}

		int index = 0;
		// 一直处于堵塞的状态
		while (selector.select() > 0) {
			// 获取到selectionkey的集合
			Set<SelectionKey> keys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = keys.iterator();
			while (iterator.hasNext()) {
				SelectionKey key = iterator.next();
				if (key.isAcceptable()) {
					ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
					SocketChannel socketChannel = acceptServerSocketChannel.accept();
					socketChannel.configureBlocking(false);
					System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
					Processor processor = processors[(int) ((index++) / coreNum)];
					processor.addChannel(socketChannel);
				}
				iterator.remove();
			}
		}
	}
}  
 import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Processor {

	private static final ExecutorService service = Executors
			.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());

	private Selector selector;

	public Processor() throws IOException {
		this.selector = SelectorProvider.provider().openSelector();
		start();
	}

	public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
		socketChannel.register(this.selector, SelectionKey.OP_READ);
	}

	public void start() {
		service.submit(() -> {
			while (true) {
				if (selector.selectNow() <= 0) {
					continue;
				}
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = keys.iterator();
				while (iterator.hasNext()) {
					SelectionKey key = iterator.next();
					iterator.remove();
					if (key.isReadable()) {
						ByteBuffer buffer = ByteBuffer.allocate(1024);
						SocketChannel socketChannel = (SocketChannel) key.channel();
						int count = socketChannel.read(buffer);
						if (count < 0) {
							socketChannel.close();
							key.cancel();
							System.out.println("读取结束" + socketChannel);
							continue;
						} else if (count == 0) {
							System.out.println("客户端信息大小:" + socketChannel);
							continue;
						} else {
							System.out.println("客户端信息:" + new String(buffer.array()));
						}
					}
				}
			}
		});
	}
}  

文章来源:智云一二三科技

文章标题:Java,事件驱动,Reactor设计模式,NIO的三种实现

文章地址:https://www.zhihuclub.com/176766.shtml

关于作者: 智云科技

热门文章

网站地图