您的位置 首页 java

不断升级,Java之BIO、NIO、AIO的演变

一、前言

一句话概括BIO NIO AIO:
第一阶段,服务端采用同步阻塞的BIO;
第二阶段,服务端采用同步阻塞的线程池的BIO;
第三阶段,JDK4之后服务端采用同步非阻塞的NIO;
第四阶段,JDK7之后服务端采用异步非阻塞的AIO。

Java BIO 对应 Linux 同步非阻塞IO,
Java NIO 对应 Linux 信号驱动IO,
Java AIO 对应 Linux 异步IO。

二、手写BIO(等待数据阻塞、数据从内核复制到用户空间阻塞)

2.1 曙光:同步与异步,阻塞与非阻塞

同步与异步

同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回。
异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但是被调用者并没有返回结果,此时我们可以处理其他的请求,被调用者通常依靠事件,回调等机制来通知调用者其返回结果。
小结:同步和异步的区别最大在于异步的话调用者不需要等待处理结果,被调用者会通过回调等机制来通知调用者其返回结果。

阻塞和非阻塞

阻塞: 阻塞就是发起一个请求,调用者一直等待请求结果返回,也就是当前线程会被挂起,无法从事其他任务,只有当条件就绪才能继续。
非阻塞: 非阻塞就是发起一个请求,调用者不用一直等着结果返回,可以先去干其他事情。
小结:同步和阻塞对应,异步和非阻塞对应,但是也存在同步非阻塞,间隔时间就调用。

tip1:同步与异步,对于响应方来说的,同步响应还是异步响应;阻塞和非阻塞,对于请求方来说的,阻塞请求方还是不阻塞请求方。

tip2:对于等待数据阶段,Linux五种IO中,第一种同步阻塞,第二种同步非阻塞,第三种基于消息通信的异步非阻塞;对于等待数据阶段+复制数据阶段,Linux五种IO中,第一种到第四种都是同步阻塞,因为复制数据的时间都是同步阻塞,只有第五种才是真正意义上的异步非阻塞。

2.2 概要:BIO

同步阻塞I/O模式,一个请求一个应答,数据的读取写入必须阻塞在一个线程内等待其完成。

采用 BIO 通信模型 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在 while(true) 循环中服务端会调用 accept() 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,如上图所示。

如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M),下面一节”伪异步 BIO”中会详细介绍到。

我们再设想一下当客户端并发访问量增加后这种模型会出现什么问题?

在 Java 虚拟机中,线程是宝贵的资源,线程的创建和销毁成本很高,除此之外,线程的切换成本也是很高的。尤其在 Linux 这样的操作系统中,线程本质上就是一个进程,创建和销毁线程都是重量级的系统函数。如果并发访问量增加会导致线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务。

2.3 手写BIO:服务端使用同步阻塞单线程的BIO,客户端使用命令行连接

 public class BIOServer {
    public static void main(String[] args) {
        try (ServerSocket serverSocket = new ServerSocket(8081)) {
            System.out.println("服务端端口号:" + serverSocket.getLocalSocketAddress());
            while (true) {
                Socket clientSocket = serverSocket.accept();
                System.out.println("连接来自:" + clientSocket.getRemoteSocketAddress());
                try (Scanner input = new Scanner(clientSocket.getInputStream())) {
                    while (true) {
                        String request = input.nextLine();
                        if ("quit".equals(request))
                            break;
                        System.out.println("客户端端口:" + clientSocket.getLocalSocketAddress());
                        System.out.println("客户端输入:" + request);
                        String response = "服务端响应" + request;
                        clientSocket.getOutputStream().write(response.getBytes());
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}  
不断升级,Java之BIO、NIO、AIO的演变


运行成功

不断升级,Java之BIO、NIO、AIO的演变


2.4 概要:伪异步IO

为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化一一一后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N.通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架,它的模型图如上图所示。当有新的客户端接入时,将客户端的 Socket 封装成一个Task(该任务实现java.lang.Runnable接口,变成一个可以多线程的类)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层任然是同步阻塞的BIO模型,因此无法从根本上解决问题。

2.5 手写伪异步IO:服务端使用同步阻塞多线程的BIO,客户端使用命令行连接

 public class ClientHandler implements Runnable {   //这就是Task
    private final Socket clientSocket;   // Task类中注入一个Socket  是客户端socket

    public ClientHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }


    @Override
    public void run() {   //Task类中国的run()方法
        try (Scanner input = new Scanner(clientSocket.getInputStream())) {   //读取输入流
            while (true) {
                String request = input.nextLine();  // 读取输入的数据
                if ("quit".equals(request)) break;   //如果输入的是quit,表示客户端程序结束了,这边结束服务端程序
                System.out.println("客户端端口:" + clientSocket.getLocalSocketAddress()+ "     客户端输入:" + request);    // 打印收到的请求报文
                String response = "server response: " + request;   //组装响应体
                clientSocket.getOutputStream().write(response.getBytes());  // 发送响应体
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}  
 public class ServerThreadPool {
    public static void main(String[] args){
        ExecutorService executor= Executors.newFixedThreadPool(3);    // 拥有三个线程的executor,在哪里使用
        try(ServerSocket serverSocket=new ServerSocket(8082)){   // 新建服务端socket
            System.out.println("服务端端口号:"+serverSocket.getLocalSocketAddress());
            while(true){
                Socket clientSocket=serverSocket.accept();   //  服务端socket接收请求 
                System.out.println("连接来自:"+clientSocket.getRemoteSocketAddress());  // 通过accept阻塞后,打印收到的请求
                executor.execute(new ClientHandler(clientSocket));   // 这里使用executor,当还有空闲线程的时候,使用空闲线程执行run方法来执行read write,没有空闲线程,这里阻塞
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}  

其实,Client不是一定要用命令行,Client代码也可以自己写出来。在客户端创建多个线程依次连接服务端并向其发送”当前时间+:hello world”,服务端会为每个客户端线程创建一个线程来处理,如下:

客户端代码代替命令行

 public class IOClient {
    public static void main(String[] args) {
        // TODO 创建多个线程,模拟多个客户端连接服务端
        new Thread(() -> {
            try {
                Socket socket = new Socket("127.0.0.1", 3333);
                while (true) {
                    try {   // 没休眠两秒钟,不断write到服务端
                        socket.getOutputStream().write((new Date() + ": hello world").getBytes());
                        Thread.sleep(2000);
                    } catch (Exception e) {
                    }
                }
            } catch (IOException e) {
            }
        }).start();
    }
}  

服务端

 public class IOServer {
    public static void main(String[] args) throws IOException {
        // TODO 服务端处理客户端连接请求
        ServerSocket serverSocket = new ServerSocket(3333);
        // 接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理
        new Thread(() -> {
            while (true) {
                try {
                    // 阻塞方法获取新的连接
                    Socket socket = serverSocket.accept();   // 子线程接收请求,不会阻塞main线程
                    // 每一个新的连接都创建一个线程,负责读取数据,接受请求后,新建一个子线程来处理read write
                    new Thread(() -> {
                        try {
                            int len;
                            byte[] data = new byte[1024];
                            InputStream inputStream = socket.getInputStream();
                            // 按字节流方式读取数据
                            while ((len = inputStream.read(data)) != -1) {   // 读入数据
                                System.out.println(new String(data, 0, len));  // 打印
                            }
                        } catch (IOException e) {
                        }
                    }).start();
                } catch (IOException e) {
                }
            }
        }).start();
    }
}  

小结

在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

三、手写NIO(等待数据不阻塞、数据从内核复制到用户空间阻塞)

3.1 NIO:JDK4之后服务端采用同步非阻塞NIO(客户端不变)

NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。

NIO中的N可以理解为Non-blocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。 NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

tip1:服务端channel和客户端channel:服务端只有启动的时候有一个channel,但是,服务端每收到一个客户端连接,就有一个客户端channel,n个客户端连接就有n个客户端channel,一个客户端断开连接会销毁这个客户端channel,同时取消在select中的selectionKey。
tip2:服务端channel和客户端channel设置为非阻塞的时机:服务端channel在启动五步中就设置为非阻塞,accept连接之后得到的客户端channel设置为非阻塞;

3.2 服务端使用同步非阻塞单线程的NIO(Reactor单线程模式,一个线程接收请求,一个线程执行操作),客户端使用命令行连接

 public class NioServer {  //三要素  Channel Selector Buffer
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // channel.open()
        serverSocketChannel.configureBlocking(false);  // 设置为阻塞为false
        serverSocketChannel.bind(new InetSocketAddress(8083)); // 设置服务端端口
        System.out.println("服务端端口:" + serverSocketChannel.getLocalAddress());   // 打印服务端ip:port
        Selector selector = Selector.open();   // selector.open()得到一个selector对象

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// channel注册到selector中
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//新建1024 Bytebuffer 缓冲区
        while (true) {  // 服务端永不停止
            int select = selector.select();  // selector选择 这里阻塞,要有客户端连接accept,或者客户端输入才能通过
            if (0 == select) continue;   //
            Set<SelectionKey> selectionKeys = selector.selectedKeys();  // 得到一个SelectKey集合
            Iterator<SelectionKey> iterator = selectionKeys.iterator();  //得到对应的迭代器,用来遍历
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();  // 遍历每一个key
                if (key.isAcceptable()) {   // key是可接受的那就接受 key.isAcceptable()  和  channel.accept()
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();     // 服务端的channel
                    SocketChannel clientChannel = channel.accept();  // 客户端的SocketChannel,类似io中的Socket

                    System.out.println("客户端接口:" + clientChannel.getRemoteAddress());   // 从客户端的SocketChannel打印客户端ip:port
                    clientChannel.configureBlocking(false);  // 客户端channel设置阻塞为false
                    clientChannel.register(selector, SelectionKey.OP_READ);  // 客户端Socketchannel注册到selector
                }
                if (key.isReadable()) {  // 是可读的就读  key.isReadable() 到 channel.read(byteBuffer)
                    SocketChannel channel = (SocketChannel) key.channel();
                    channel.read(byteBuffer);  // 读取buffer里面的内容,从客户端读入,这个key已经建立连接,所以使用channel读入
                    String request = new String(byteBuffer.array()).trim();
                    byteBuffer.clear();   // 每次循环之前或之后要清空buffer
                    System.out.println("客户端端口:" + channel.getRemoteAddress() + "    客户端输入:" + request);

                    String response = "server request" + request;
                    channel.write(ByteBuffer.wrap(response.getBytes()));  // 写出到客户端

                }
                iterator.remove();   //一定要使用迭代器删除
            }
        }
    }
}  

运行结果:

实际上,accept和读写操作可以交给子线程来完成,不一定要在main线程上完成,如下:

 public class NIOServer {
    public static void main(String[] args) throws IOException {
        // 1. serverSelector负责轮询是否有新的连接,服务端监测到新的连接之后,不再创建一个新的线程,
        // 而是直接将新连接绑定到clientSelector上,这样就不用 IO 模型中 1w 个 while 循环在死等
        Selector serverSelector = Selector.open();
        // 2. clientSelector负责轮询连接是否有数据可读
        Selector clientSelector = Selector.open();
        new Thread(() -> {
            try {
                // 对应IO编程中服务端启动
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();  // 打开
                listenerChannel.socket().bind(new InetSocketAddress(3333));
                listenerChannel.configureBlocking(false);
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);   // 服务端channel注册到服务端selector上面去
                while (true) {
                    // 监测是否有新的连接,这里的1指的是阻塞的时间为 1ms
                    if (serverSelector.select(1) > 0) {  // 服务端select
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isAcceptable()) {
                                try {
                                    // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);   // 客户端selector注册到客户端selector上面去
                                } finally {
                                    keyIterator.remove();
                                }
                            }
                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();
        new Thread(() -> {   // 这个线程负责客户端读写
            try {
                while (true) {
                    // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为 1ms
                    if (clientSelector.select(1) > 0) {
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isReadable()) {
                                try {
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    // (3) 面向 Buffer
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }
                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();
    }
}  

3.3 合理封装:Reactor单线程模型的NIO,一个线程接收请求accept,一个线程执行IO操作read-write

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    private static final Charset charset = Charset.forName("utf-8");

    private static Selector selector ;

    static {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try {
        // channel起手四句:ServerSocketChannel.open();  channel设置阻塞为false
        // channel.bind 端口号   channel.register(selector)
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8080));
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
            new Reactor().start();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static class Reactor extends Thread{

        @Override
        public void run() {
            try {
                select();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void select() throws IOException{
            while (selector.select()>0){  //如果没有准备好的通道,这里会阻塞住,减少CPU消耗,有准备好的通过,马上使用
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()){
                        accept(key);
                    }else if (key.isReadable()){
                        read(key);
                    }
                    iterator.remove();  //必须要从集合中移除!否则下次事件发生时不会被感知到
                }
            }
        }

        private void read(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(100);
            int num = channel.read(byteBuffer);
            if (num > 0) {
                byteBuffer.flip();
                String data = charset.decode(byteBuffer).toString();   // 解析出来
                System.out.println(data);   //打印出来
            }else if (num == -1){
                // num=-1代表连接已关闭
                channel.close();
            }
        }

        private void accept(SelectionKey key) throws IOException {
            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
            SocketChannel channel = serverChannel.accept();
            if (channel != null) {
                InetSocketAddress localAddress = (InetSocketAddress) channel.getLocalAddress();
                String hostName = localAddress.getHostName();
                System.out.println("接收到来自" + hostName + "的请求");

                channel.configureBlocking(false);
                //监听这个接收到的socket
                channel.register(selector, SelectionKey.OP_READ);  // 注册到客户端channel上面去
            }
        }
    }
}  

注意1:这里将所有发生的事件都交给单个线程去处理,如果性能不够,可以开个线程池去处理事件,
比如,Reactor三种模式:单线程、线程池、主从线程池。

注意2:这里的select模型和redis单线程处理多个连接请求有相似之处;

注意3:解释一下 while (selector.select()>0)
对于while (selector.select()>0)这句,表示如果没有准备好的通道,这里会阻塞住,减少CPU消耗,有准备好的通过,马上使用。

问题:为什么大家都不愿意用 JDK 原生 NIO 进行开发呢?
回答:
1、原生NIO开发代码比较复杂,需要熟练Channel Buffer Selector的使用,增加程序员的负担;
2、JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%;
3、已封装的框架做好了,Netty 的出现很大程度上改善了 JDK 原生 NIO 所存在的一些让人难以忍受的问题,完成封装好了NIO的操作,提供了Reactor三种模式的实现。

四、手写AIO(等待数据不阻塞、数据从内核复制到用户空间不阻塞)

4.1 概要:AIO

AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

AIO 是异步IO的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO操作本身是同步的。

4.2 手写AIO

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOServer {

    public void startListen(int port) throws InterruptedException {
        try {
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.accept(null,new CompletionHandler<AsynchronousSocketChannel,Void>() {
                @Override
                public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
                    serverSocketChannel.accept(null,this); //收到连接后,应该调用accept方法等待新的连接进来
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer,byteBuffer, new CompletionHandler<Integer,ByteBuffer>() {
                        @Override
                        public void completed(Integer num, ByteBuffer attachment) {
                            if (num > 0){
                                attachment.flip();
                                System.out.println(new String(attachment.array()).trim());
                            }else {
                                try {
                                    socketChannel.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            System.out.println("read error");
                            exc.printStackTrace();
                        }
                    });
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.out.println("accept error");
                    exc.printStackTrace();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }

		//模拟去做其他事情
        while (true){
            Thread.sleep(1000);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        AIOServer aioServer = new AIOServer();
        aioServer.startListen(8080);
    }
}  

五、面试金手指

5.1 BIO

BIO中是accept read write都是同步阻塞操作,这是无法改变,所有要改善bio,必须使用线程池,但是线程的创建和销毁的代价比较大,特别是对于linux这种,一个线程就是一个进程的,但是没办法,只有这样一种方式。

BIO中是accept read write都是同步阻塞操作 :同步阻塞I/O模式,一个请求一个应答,数据的读取写入必须阻塞在一个线程内等待其完成。采用 BIO 通信模型 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在 while(true) 循环中服务端会调用 accept() 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接。

多线程,就是伪异步IO :如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M),下面一节”伪异步 BIO”中会详细介绍到。

多线程,伪异步IO的代价大,但是没办法 :在 Java 虚拟机中,线程是宝贵的资源,线程的创建和销毁成本很高,除此之外,线程的切换成本也是很高的。尤其在 Linux 这样的操作系统中,线程本质上就是一个进程,创建和销毁线程都是重量级的系统函数。如果并发访问量增加会导致线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务。

5.2 伪异步IO

线程池 :为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化一一一后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N.通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

伪异步IO :采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架,它的模型图如上图所示。当有新的客户端接入时,将客户端的 Socket 封装成一个Task(该任务实现java.lang.Runnable接口,变成一个可以多线程的类)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

伪异步IO还是BIO :伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层任然是同步阻塞的BIO模型,因此无法从根本上解决问题。

5.3 NIO

NIO中的N可以理解为Non-blocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和ServerSocketChannel

两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO 的非阻塞模式来开发。

5.4 NIO与IO的区别

问题:NIO与IO区别?

回答 :首先,从 NIO 流是同步非阻塞,而 IO 流是同步阻塞 IO 说起;然后,从 NIO 的3个核心组件/特性为 NIO带来的一些改进来分析。

第一,IO流是阻塞的,NIO流是不阻塞的

(1)Java IO的各种流是阻塞的。这意味着,当一个线程调用 accept() read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
(2)Java NIO使我们可以进行非阻塞IO操作。
非阻塞读read:单线程中从通道读取数据到buffer,同时可以继续做别的事情,当数据读取到buffer中后,线程再继续处理数据。
非阻塞写write:一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

第二,NIO三要素Buffer(缓冲区) IO 面向流(Stream oriented),而 NIO 面向缓冲区(Buffer oriented)

Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream**对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。
在NIO厍中,所有数据都是用缓冲区buffer处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能用于操作 byte 数组。除了ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区。
值得注意的是,Buffer只对执行IO操作的线程有用(read/write),对执行连接的线程是没有用的(accept)。

第三,NIO三要素Channel (通道) NIO 通过Channel(通道) 进行读写

通道是双向的,可读也可写,而流的读写是单向的。无论读写,通道channel 只能和Buffer交互。因为 Buffer,通道可以异步地读写。

在这里,需要知道的是,NIO中的所有IO都是从 Channel(通道) 开始的。
从通道进行数据读取 :创建一个缓冲区,然后请求通道读取数据。
从通道进行数据写入 :创建一个缓冲区,填充数据,并要求通道写入数据。
如下图所示:

不断升级,Java之BIO、NIO、AIO的演变

第四,NIO三要素Selectors(选择器) NIO有选择器,而IO没有

选择器用于使用单个线程处理多个通道,因此,它需要较少的线程来处理这些通道。另一方面,线程之间的切换对于操作系统来说是昂贵的,因此,为了提高系统效率选择器是有用的。操作过程中,Channel注册到选择器中,如下图所示:

不断升级,Java之BIO、NIO、AIO的演变

小结:从BIO到NIO,分下NIO三要素:Buffer Channel Selector
Buffer:表示满了才读,这就是linux五种IO中的同步非阻塞IO,即recvfrom轮询;
Channel:ServerSocket 变为 ServerSocketChannel,Socket变为SocketChannel;
Selector:用来选择Channel,所有的Channel都注册在Selector上面。

5.5 AIO

异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
AIO 是异步IO的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO操作本身是同步的。

小结:三种IO要完成的功能是一样的 :BIO也是accept read write三个操作,NIO也是accept read write三个操作,AIO也是accept read write三个操作,任何io都是accept read write三个操作。

六、尾声

不断升级,Java之BIO、NIO、AIO的演变,完成了。

天天打码,天天进步!!!

如果觉得本文有用,可以 关注+转发 ,您的鼓励就是我创作的最大动力。

文章来源:智云一二三科技

文章标题:不断升级,Java之BIO、NIO、AIO的演变

文章地址:https://www.zhihuclub.com/176759.shtml

关于作者: 智云科技

热门文章

网站地图