您的位置 首页 java

Java面试篇基础部分-Java 实现的I/O方式

Java I/O

在整个的java.io包中提供了5个重要的I/O类和1个接口类。5个类分别是 File 、OutputStream、 InputStream Writer 、Reader ,1个接口是指 Serializable 序列化接口。具体的使用方式可以查看 JDK 的参考文档。

Java面试篇基础部分-Java 实现的I/O方式

Java NIO 实现

Java NIO 的实现内容主要有如下的三个核心内容

  • Selector(选择器)
  • Channel(通道)
  • Buffer(缓冲区)

Selector 用于监听多个Channel的操作事件,例如连接的打开、数据的连接处理等,所以说一个 线程 可以实现对于多个Channel的管理操作。

传统的I/O是基于数据流的方式进行I/O的读写操作;NIO是基于Channel和Buffer进行的I/O操作,数据从Channel中读取到了Buffer中,或者操作从Buffer数据写入到Channel中。

JavaNIO 与传统I/O操作的区别

  • I/O操作是面对流对象的,而NIO则是面向缓冲区的;在面向流的操作中,数据只能在输入流或者输出流中进行连续的读写操作,数据没有缓冲区这个概念,所以字节流无法进行前后移动。在NIO的操作中数据是从Channel中读取到Buffer中,然后再从Buffer中读取的Channel中,既然出现了Channel就可以很容易的实现数据的前后操作。也就出现了NIO中常见的拆包、粘包问题。
  • 传统的I/O方式是阻塞的模式,NIO是非阻塞模式,在传统的I/O模式下,当用户调用了read()或者是 write ()进行读写操作的时候,线程一直处于阻塞状态等待数据的写入写出操作。NIO通过Selector机制监听每个Channel的事件的变化,当Channel上有数据发生变化的时候通知对应的线程进行读写操作。对于读请求,在Channel有可用数据的时候,线程现将数据写入到Buffer上,在没有数据的时候,线程执行其他的业务逻辑操作。对于写请求,在一个线程进行写操作的时候数据写入到某个Channel中的时候,只需要Channel上的数据通过异步的方式写入到Buffer就可以了。Buffer上的数据会通过异步的方式写入到目标机器的Channel上,用户线程不需要等待数据完全被写入,就可以执行其他的业务逻辑操作。

非阻塞IO模型中Selector线程工作如下图所示。

Java面试篇基础部分-Java 实现的I/O方式

Channel

Channel和I/O流Stream类似,只不过Stream是单向的,Channel是双向的,也就是说流式只能是输入流或者输出流,但是Channel即可用来进行读操作,也可以进行写操作。

NIO中Channel主要的实现类有如下几种:

  • FileChannel
  • DatagramChannel
  • socket Channel
  • ServerSocketChannel

分别对应的是文件I/O操作、 UDP 、TCP IO 、 Socket Client和Socket Server操作。

Buffer

Buffer实际上通过上面的理解我们可以看出,它是一个容器,其内部通过一个连续的字节数组存储I/O上的数据。在NIO中,Channel在文件、网络上对数据的读取或写入都必须经过Buffer。

Java面试篇基础部分-Java 实现的I/O方式

如图所示,客户端向着服务端发送数据的时候,先将数据写入到Buffer中,然后将Buffer中的数据写入到服务端对应的Channel中,服务端在接受到数据的时候通过Channel将数据读入到Buffer中,然后从Buffer中读取数据并进行对应的处理。

Java NIO包中Buffer是一个抽象类主要有如下的一些实现类

  • ByteBuffer
  • IntBuffer
  • Char Buffer
  • LongBuffer
  • DoubleBuffer
  • FloatBuffer
  • ShortBuffer

Selector

Selector选择器,用来检测在多个Channel上是否有I/O操作事件发生,并且对检测到的I/O事件进行相应的处理。所以Selector通过一个线程就可以实现对于多个Channel的管理。这样就不需要为每个连接都创建线程,避免了线程资源和在多线程上下文切换导致的开销。

Selector只有在Channel有读写事件发生的时候,才能调用I/O函数进行读写操作。极大地减少了系统开销,提高系统的并发量。

服务端示例代码

 package nioDemo;
 
import java.io.IO Exception ;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset. Charset ;
import java.util. iterator ;
import java.util.Random;
import java.util.Set;
 
 
/*服务器端,:接收客户端发送过来的数据并显示,
 *服务器把上接收到的数据加上" Echo  from service:"再发送回去*/public class ServiceSocketChannelDemo {
     
    public  static  class TCPEchoServer implements Runnable{
         
        /*服务器地址*/         private  InetSocketAddress localAddress;
         
        public TCPEchoServer(int port) throws IOException{
            this.localAddress = new InetSocketAddress(port);
        }
         
         
        @ Override 
        public  void  run(){
             
            Charset utf8 = Charset.forName("UTF-8");
             
            ServerSocketChannel ssc = null;
            Selector selector = null;
             
            Random rnd = new Random();
             
            try {
                /*创建选择器*/                selector = Selector.open();
                 
                /*创建服务器通道*/                ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);
                 
                /*设置监听服务器的端口,设置最大连接缓冲数为100*/                ssc.bind(localAddress, 100);
                 
                /*服务器通道只能对tcp链接事件感兴趣*/                ssc.register(selector, SelectionKey.OP_ACCEPT);
                 
            } catch (IOException e1) {
                System.out.println("server start failed");
                return;
            } 
             
            System.out.println("server start with address : " + localAddress);
             
            /*服务器线程被中断后会退出*/            try{
                 
                while(!Thread.currentThread().isInterrupted()){
                     
                    int n = selector.select();
                    if(n == 0){
                        continue;
                    }
 
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                    SelectionKey key = null;
                     
                    while(it.hasNext()){
                             
                        key = it.next();
                        /*防止下次select方法返回已处理过的通道*/                        it.remove();
                         
                        /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/                        try{
                            /*ssc通道只能对链接事件感兴趣*/                            if(key.isAcceptable()){
                                 
                                /*accept方法会返回一个普通通道,
                                     每个通道在内核中都对应一个socket缓冲区*/                                SocketChannel sc = ssc.accept();
                                sc.configureBlocking(false);
                                 
                                /*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/                                int interestSet = SelectionKey.OP_READ;                             
                                sc.register(selector, interestSet, new Buffers(256, 256));
                                 
                                System.out.println("accept from " + sc.getRemoteAddress());
                            }
                             
                             
                            /*(普通)通道感兴趣读事件且有数据可读*/                            if(key.isReadable()){
                                 
                                /*通过SelectionKey获取通道对应的缓冲区*/                                Buffers  buffers = (Buffers)key.attachment();
                                ByteBuffer readBuffer = buffers.getReadBuffer();
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer();
                                 
                                /*通过SelectionKey获取对应的通道*/                                SocketChannel sc = (SocketChannel) key.channel();
                                 
                                /*从底层socket读缓冲区中读入数据*/                                sc.read(readBuffer);
                                readBuffer.flip();
                                 
                                /*解码显示,客户端发送来的信息*/                                CharBuffer cb = utf8. decode (readBuffer);
                                System.out.println(cb.array());
                     
                                readBuffer.rewind();
 
                                 
                                /*准备好向客户端发送的信息*/                                /*先写入"echo:",再写入收到的信息*/                                writeBuffer.put("echo from service:".getBytes("UTF-8"));
                                writeBuffer.put(readBuffer);
                                 
                                readBuffer.clear();
                                 
                                /*设置通道写事件*/                                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                                                 
                            }
                             
                            /*通道感兴趣写事件且底层缓冲区有空闲*/                            if(key.isWritable()){
                                 
                                Buffers  buffers = (Buffers)key.attachment();
                                 
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer();
                                writeBuffer.flip();
                                 
                                SocketChannel sc = (SocketChannel) key.channel();
                                 
                                int len = 0;
                                while(writeBuffer.hasRemaining()){
                                    len = sc.write(writeBuffer);
                                    /*说明底层的socket写缓冲已满*/                                    if(len == 0){
                                        break;
                                    }
                                }
                                 
                                writeBuffer.compact();
                                 
                                /*说明数据全部写入到底层的socket写缓冲区*/                                if(len != 0){
                                    /*取消通道的写事件*/                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                                }
                                 
                            }
                        }catch(IOException e){
                            System.out.println("service encounter client error");
                            /*若客户端连接出现异常,从Seletcor中移除这个key*/                            key.cancel();
                            key.channel().close();
                        }
 
                    }
                         
                    Thread.sleep(rnd.nextInt(500));
                }
                 
            }catch(InterruptedException e){
                System.out.println("serverThread is interrupted");
            } catch (IOException e1) {
                System.out.println("serverThread selecotr error");
            }finally{
                try{
                    selector.close();
                }catch(IOException e){
                    System.out.println("selector close failed");
                }finally{
                    System.out.println("server close");
                }
            }
 
        }
    }
     
    public static void main(String[] args) throws InterruptedException, IOException{
        Thread thread = new Thread(new TCPEchoServer(8080));
        thread.start();
        Thread.sleep(100000);
        /*结束服务器线程*/        thread.interrupt();
    }
     
}  

客户端示例代码

 package nioDemo;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
 
/*客户端:客户端每隔1~2秒自动向服务器发送数据,接收服务器接收到数据并显示*/public class ClientSocketChannelDemo {
     
    public static class TCPEchoClient implements Runnable{
         
        /*客户端线程名*/        private String name;
        private Random rnd = new Random();
         
        /*服务器的ip地址+端口port*/        private InetSocketAddress remoteAddress;
         
        public TCPEchoClient(String name, InetSocketAddress remoteAddress){
            this.name = name;
            this.remoteAddress = remoteAddress;
        }
         
        @Override
        public void run(){
             
            /*创建解码器*/            Charset utf8 = Charset.forName("UTF-8");
             
            Selector selector;
             
            try {
                 
                /*创建TCP通道*/                SocketChannel sc = SocketChannel.open();
                 
                /*设置通道为非阻塞*/                sc.configureBlocking(false);
                 
                /*创建选择器*/                selector = Selector.open();
                 
                /*注册感兴趣事件*/                int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
                 
                /*向选择器注册通道*/                sc.register(selector, interestSet, new Buffers(256, 256));
                 
                /*向服务器发起连接,一个通道代表一条tcp链接*/                sc.connect(remoteAddress);
                 
                /*等待三次握手完成*/                while(!sc.finishConnect()){
                    ;
                }
 
                System.out.println(name + " " + "finished connection");
                 
            } catch (IOException e) {
                System.out.println("client connect failed");
                return;
            }
             
            /*与服务器断开或线程被中断则结束线程*/            try{
 
                int i = 1;
                while(!Thread.currentThread().isInterrupted()){
                     
                    /*阻塞等待*/                    selector.select();
                     
                    /*Set中的每个key代表一个通道*/                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                     
                    /*遍历每个已就绪的通道,处理这个通道已就绪的事件*/                    while(it.hasNext()){
                         
                        SelectionKey key = it.next();
                        /*防止下次select方法返回已处理过的通道*/                        it.remove();
                         
                        /*通过SelectionKey获取对应的通道*/                        Buffers  buffers = (Buffers)key.attachment();
                        ByteBuffer readBuffer = buffers.getReadBuffer();
                        ByteBuffer writeBuffer = buffers.gerWriteBuffer();
                         
                        /*通过SelectionKey获取通道对应的缓冲区*/                        SocketChannel sc = (SocketChannel) key.channel();
                         
                        /*表示底层socket的读缓冲区有数据可读*/                        if(key.isReadable()){
                            /*从socket的读缓冲区读取到程序定义的缓冲区中*/                            sc.read(readBuffer);
                            readBuffer.flip();
                            /*字节到utf8解码*/                            CharBuffer cb = utf8.decode(readBuffer);
                            /*显示接收到由服务器发送的信息*/                            System.out.println(cb.array());
                            readBuffer.clear();
                        }
                         
                        /*socket的写缓冲区可写*/                        if(key.isWritable()){
                            writeBuffer.put((name + "  " + i).getBytes("UTF-8"));
                            writeBuffer.flip();
                            /*将程序定义的缓冲区中的内容写入到socket的写缓冲区中*/                            sc.write(writeBuffer);
                            writeBuffer.clear();
                            i++;
                        }
                    }
                     
                    Thread.sleep(1000 + rnd.nextInt(1000));
                }
             
            }catch(InterruptedException e){
                System.out.println(name + " is interrupted");
            }catch(IOException e){
                System.out.println(name + " encounter a connect error");
            }finally{
                try {
                    selector.close();
                } catch (IOException e1) {
                    System.out.println(name + " close selector failed");
                }finally{
                    System.out.println(name + "  closed");
                }
            }
        }
         
    }
     
    public static void main(String[] args) throws InterruptedException{
         
        InetSocketAddress remoteAddress = new InetSocketAddress("192.168.1.100", 8080);
         
        Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress));
        Thread tb = new Thread(new TCPEchoClient("thread b", remoteAddress));
        Thread tc = new Thread(new TCPEchoClient("thread c", remoteAddress));
        Thread td = new Thread(new TCPEchoClient("thread d", remoteAddress));
         
        ta.start();
        tb.start();
        tc.start();
         
        Thread.sleep(5000);
 
        /*结束客户端a*/        ta.interrupt();
         
        /*开始客户端d*/        td.start();
    }
}  

文章来源:智云一二三科技

文章标题:Java面试篇基础部分-Java 实现的I/O方式

文章地址:https://www.zhihuclub.com/177685.shtml

关于作者: 智云科技

热门文章

网站地图