Java I/O
在整个的java.io包中提供了5个重要的I/O类和1个接口类。5个类分别是 File 、OutputStream、 InputStream 、 Writer 、Reader ,1个接口是指 Serializable 序列化接口。具体的使用方式可以查看 JDK 的参考文档。
Java NIO 实现
Java NIO 的实现内容主要有如下的三个核心内容
- Selector(选择器)
- Channel(通道)
- Buffer(缓冲区)
Selector 用于监听多个Channel的操作事件,例如连接的打开、数据的连接处理等,所以说一个 线程 可以实现对于多个Channel的管理操作。
传统的I/O是基于数据流的方式进行I/O的读写操作;NIO是基于Channel和Buffer进行的I/O操作,数据从Channel中读取到了Buffer中,或者操作从Buffer数据写入到Channel中。
JavaNIO 与传统I/O操作的区别
- I/O操作是面对流对象的,而NIO则是面向缓冲区的;在面向流的操作中,数据只能在输入流或者输出流中进行连续的读写操作,数据没有缓冲区这个概念,所以字节流无法进行前后移动。在NIO的操作中数据是从Channel中读取到Buffer中,然后再从Buffer中读取的Channel中,既然出现了Channel就可以很容易的实现数据的前后操作。也就出现了NIO中常见的拆包、粘包问题。
- 传统的I/O方式是阻塞的模式,NIO是非阻塞模式,在传统的I/O模式下,当用户调用了read()或者是 write ()进行读写操作的时候,线程一直处于阻塞状态等待数据的写入写出操作。NIO通过Selector机制监听每个Channel的事件的变化,当Channel上有数据发生变化的时候通知对应的线程进行读写操作。对于读请求,在Channel有可用数据的时候,线程现将数据写入到Buffer上,在没有数据的时候,线程执行其他的业务逻辑操作。对于写请求,在一个线程进行写操作的时候数据写入到某个Channel中的时候,只需要Channel上的数据通过异步的方式写入到Buffer就可以了。Buffer上的数据会通过异步的方式写入到目标机器的Channel上,用户线程不需要等待数据完全被写入,就可以执行其他的业务逻辑操作。
非阻塞IO模型中Selector线程工作如下图所示。
Channel
Channel和I/O流Stream类似,只不过Stream是单向的,Channel是双向的,也就是说流式只能是输入流或者输出流,但是Channel即可用来进行读操作,也可以进行写操作。
NIO中Channel主要的实现类有如下几种:
- FileChannel
- DatagramChannel
- socket Channel
- ServerSocketChannel
分别对应的是文件I/O操作、 UDP 、TCP IO 、 Socket Client和Socket Server操作。
Buffer
Buffer实际上通过上面的理解我们可以看出,它是一个容器,其内部通过一个连续的字节数组存储I/O上的数据。在NIO中,Channel在文件、网络上对数据的读取或写入都必须经过Buffer。
如图所示,客户端向着服务端发送数据的时候,先将数据写入到Buffer中,然后将Buffer中的数据写入到服务端对应的Channel中,服务端在接受到数据的时候通过Channel将数据读入到Buffer中,然后从Buffer中读取数据并进行对应的处理。
Java NIO包中Buffer是一个抽象类主要有如下的一些实现类
- ByteBuffer
- IntBuffer
- Char Buffer
- LongBuffer
- DoubleBuffer
- FloatBuffer
- ShortBuffer
Selector
Selector选择器,用来检测在多个Channel上是否有I/O操作事件发生,并且对检测到的I/O事件进行相应的处理。所以Selector通过一个线程就可以实现对于多个Channel的管理。这样就不需要为每个连接都创建线程,避免了线程资源和在多线程上下文切换导致的开销。
Selector只有在Channel有读写事件发生的时候,才能调用I/O函数进行读写操作。极大地减少了系统开销,提高系统的并发量。
服务端示例代码
package nioDemo;
import java.io.IO Exception ;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset. Charset ;
import java.util. iterator ;
import java.util.Random;
import java.util.Set;
/*服务器端,:接收客户端发送过来的数据并显示,
*服务器把上接收到的数据加上" Echo from service:"再发送回去*/public class ServiceSocketChannelDemo {
public static class TCPEchoServer implements Runnable{
/*服务器地址*/ private InetSocketAddress localAddress;
public TCPEchoServer(int port) throws IOException{
this.localAddress = new InetSocketAddress(port);
}
@ Override
public void run(){
Charset utf8 = Charset.forName("UTF-8");
ServerSocketChannel ssc = null;
Selector selector = null;
Random rnd = new Random();
try {
/*创建选择器*/ selector = Selector.open();
/*创建服务器通道*/ ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
/*设置监听服务器的端口,设置最大连接缓冲数为100*/ ssc.bind(localAddress, 100);
/*服务器通道只能对tcp链接事件感兴趣*/ ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e1) {
System.out.println("server start failed");
return;
}
System.out.println("server start with address : " + localAddress);
/*服务器线程被中断后会退出*/ try{
while(!Thread.currentThread().isInterrupted()){
int n = selector.select();
if(n == 0){
continue;
}
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
/*防止下次select方法返回已处理过的通道*/ it.remove();
/*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/ try{
/*ssc通道只能对链接事件感兴趣*/ if(key.isAcceptable()){
/*accept方法会返回一个普通通道,
每个通道在内核中都对应一个socket缓冲区*/ SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
/*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/ int interestSet = SelectionKey.OP_READ;
sc.register(selector, interestSet, new Buffers(256, 256));
System.out.println("accept from " + sc.getRemoteAddress());
}
/*(普通)通道感兴趣读事件且有数据可读*/ if(key.isReadable()){
/*通过SelectionKey获取通道对应的缓冲区*/ Buffers buffers = (Buffers)key.attachment();
ByteBuffer readBuffer = buffers.getReadBuffer();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();
/*通过SelectionKey获取对应的通道*/ SocketChannel sc = (SocketChannel) key.channel();
/*从底层socket读缓冲区中读入数据*/ sc.read(readBuffer);
readBuffer.flip();
/*解码显示,客户端发送来的信息*/ CharBuffer cb = utf8. decode (readBuffer);
System.out.println(cb.array());
readBuffer.rewind();
/*准备好向客户端发送的信息*/ /*先写入"echo:",再写入收到的信息*/ writeBuffer.put("echo from service:".getBytes("UTF-8"));
writeBuffer.put(readBuffer);
readBuffer.clear();
/*设置通道写事件*/ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
/*通道感兴趣写事件且底层缓冲区有空闲*/ if(key.isWritable()){
Buffers buffers = (Buffers)key.attachment();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();
writeBuffer.flip();
SocketChannel sc = (SocketChannel) key.channel();
int len = 0;
while(writeBuffer.hasRemaining()){
len = sc.write(writeBuffer);
/*说明底层的socket写缓冲已满*/ if(len == 0){
break;
}
}
writeBuffer.compact();
/*说明数据全部写入到底层的socket写缓冲区*/ if(len != 0){
/*取消通道的写事件*/ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
}
}catch(IOException e){
System.out.println("service encounter client error");
/*若客户端连接出现异常,从Seletcor中移除这个key*/ key.cancel();
key.channel().close();
}
}
Thread.sleep(rnd.nextInt(500));
}
}catch(InterruptedException e){
System.out.println("serverThread is interrupted");
} catch (IOException e1) {
System.out.println("serverThread selecotr error");
}finally{
try{
selector.close();
}catch(IOException e){
System.out.println("selector close failed");
}finally{
System.out.println("server close");
}
}
}
}
public static void main(String[] args) throws InterruptedException, IOException{
Thread thread = new Thread(new TCPEchoServer(8080));
thread.start();
Thread.sleep(100000);
/*结束服务器线程*/ thread.interrupt();
}
}
客户端示例代码
package nioDemo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
/*客户端:客户端每隔1~2秒自动向服务器发送数据,接收服务器接收到数据并显示*/public class ClientSocketChannelDemo {
public static class TCPEchoClient implements Runnable{
/*客户端线程名*/ private String name;
private Random rnd = new Random();
/*服务器的ip地址+端口port*/ private InetSocketAddress remoteAddress;
public TCPEchoClient(String name, InetSocketAddress remoteAddress){
this.name = name;
this.remoteAddress = remoteAddress;
}
@Override
public void run(){
/*创建解码器*/ Charset utf8 = Charset.forName("UTF-8");
Selector selector;
try {
/*创建TCP通道*/ SocketChannel sc = SocketChannel.open();
/*设置通道为非阻塞*/ sc.configureBlocking(false);
/*创建选择器*/ selector = Selector.open();
/*注册感兴趣事件*/ int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
/*向选择器注册通道*/ sc.register(selector, interestSet, new Buffers(256, 256));
/*向服务器发起连接,一个通道代表一条tcp链接*/ sc.connect(remoteAddress);
/*等待三次握手完成*/ while(!sc.finishConnect()){
;
}
System.out.println(name + " " + "finished connection");
} catch (IOException e) {
System.out.println("client connect failed");
return;
}
/*与服务器断开或线程被中断则结束线程*/ try{
int i = 1;
while(!Thread.currentThread().isInterrupted()){
/*阻塞等待*/ selector.select();
/*Set中的每个key代表一个通道*/ Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
/*遍历每个已就绪的通道,处理这个通道已就绪的事件*/ while(it.hasNext()){
SelectionKey key = it.next();
/*防止下次select方法返回已处理过的通道*/ it.remove();
/*通过SelectionKey获取对应的通道*/ Buffers buffers = (Buffers)key.attachment();
ByteBuffer readBuffer = buffers.getReadBuffer();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();
/*通过SelectionKey获取通道对应的缓冲区*/ SocketChannel sc = (SocketChannel) key.channel();
/*表示底层socket的读缓冲区有数据可读*/ if(key.isReadable()){
/*从socket的读缓冲区读取到程序定义的缓冲区中*/ sc.read(readBuffer);
readBuffer.flip();
/*字节到utf8解码*/ CharBuffer cb = utf8.decode(readBuffer);
/*显示接收到由服务器发送的信息*/ System.out.println(cb.array());
readBuffer.clear();
}
/*socket的写缓冲区可写*/ if(key.isWritable()){
writeBuffer.put((name + " " + i).getBytes("UTF-8"));
writeBuffer.flip();
/*将程序定义的缓冲区中的内容写入到socket的写缓冲区中*/ sc.write(writeBuffer);
writeBuffer.clear();
i++;
}
}
Thread.sleep(1000 + rnd.nextInt(1000));
}
}catch(InterruptedException e){
System.out.println(name + " is interrupted");
}catch(IOException e){
System.out.println(name + " encounter a connect error");
}finally{
try {
selector.close();
} catch (IOException e1) {
System.out.println(name + " close selector failed");
}finally{
System.out.println(name + " closed");
}
}
}
}
public static void main(String[] args) throws InterruptedException{
InetSocketAddress remoteAddress = new InetSocketAddress("192.168.1.100", 8080);
Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress));
Thread tb = new Thread(new TCPEchoClient("thread b", remoteAddress));
Thread tc = new Thread(new TCPEchoClient("thread c", remoteAddress));
Thread td = new Thread(new TCPEchoClient("thread d", remoteAddress));
ta.start();
tb.start();
tc.start();
Thread.sleep(5000);
/*结束客户端a*/ ta.interrupt();
/*开始客户端d*/ td.start();
}
}