您的位置 首页 java

「从入门到放弃-Java」并发编程-NIO-Channel

前言

上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用,本篇将深入源码分析nio中channel的实现。

简介

channel即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。

channel需要与buffer配合使用,channel通道的一端是buffer,一端是数据源实体,如文件、 socket 等。在n io 中,通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。

channel接口:

package java.nio.channels;
import java.io.IOException;
import java.io.Closeable;
/**
 * A nexus for I/O operations.
 *
 * <p> A channel represents an open connection to an entity such as a hardware
 * device, a file, a network socket, or a program component that is capable of
 * performing one or more distinct I/O operations, for example reading or
 * writing.
 *
 * <p> A channel is either open or closed. A channel is open upon creation,
 * and once closed it remains closed. Once a channel is closed, any attempt to
 * invoke an I/O operation upon it will cause a {@link ClosedChannelException}
 * to be thrown. Whether or not a channel is open may be tested by invoking
 * its {@link #isOpen isOpen} method.
 *
 * <p> Channels are, in general, intended to be safe for multithreaded access
 * as described in the specifications of the interfaces and classes that extend
 * and implement this interface.
 *
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 */public interface Channel extends Closeable {
 /**
 * Tells whether or not this channel is open.
 *
 * @return <tt>true</tt> if, and only if, this channel is open
 */ public boolean isOpen();
 /**
 * Closes this channel.
 *
 * <p> After a channel is closed, any further attempt to invoke I/O
 * operations upon it will cause a {@link ClosedChannelException} to be
 * thrown.
 *
 * <p> If this channel is already closed then invoking this method has no
 * effect.
 *
 * <p> This method may be invoked at any time. If some other thread has
 * already invoked it, however, then another invocation will block until
 * the first invocation is complete, after which it will return without
 * effect. </p>
 *
 * @throws IOException If an I/O error occurs
 */ public void close() throws IOException;
}
 

常见的channel实现有:

  • FileChannel:文件读写数据通道
  • SocketChannel:TCP读写网络数据通道
  • ServerSocketChannel:服务端网络数据读写通道,可以监听TCP连接。对每一个新进来的连接都会创建一个SocketChannel。
  • DatagramChannel:UDP读写网络数据通道

FileChannel

FileChannel是一个抽象类,它继承了AbstractInterruptibleChannel类,并实现了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。

具体的实现类主要是sun.nio.ch.FileChannelImpl。下面详细分析下FileChannelImpl中每个方法的具体实现。

open

private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {
 //主要记载操作系统维护的文件描述符
 this.fd = var1;
 
 //是否可读
 this.readable = var3;
 
 //是否可写
 this.writable = var4;
 
 //是否以追加的方式打开
 this.append = var5;
 
 this.parent = var6;
 this.path = var2;
 
 //底层使用native的read和write来处理文件的
 this.nd = new FileDispatcherImpl(var5);
}
//FileInputStream::getChannel 调用 FileChannelImpl.open(fd, path, true, false, this) 获取只读channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {
 return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}
//FileOutputStream::getChannel 调用 FileChannelImpl.open(fd, path, false, true, append, this) 获取只写channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {
 return new FileChannelImpl(var0, var1, var2, var3, var4, var5);
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,
 boolean writable, boolean direct, Object parent)
{
 this.fd = fd;
 
 //是否可读
 this.readable = readable;
 
 //是否可写
 this.writable = writable;
 
 //对于从流创建的channel,在结束时要做不同的清理动作,(openJDK中才有,sun的jdk中没有)
 this.parent = parent;
 
 //源文件的path
 this.path = path;
 
 //是否使用DirectIO
 this.direct = direct;
 
 this.nd = new FileDispatcherImpl();
 if (direct) {
 assert path != null;
 this.alignment = nd.setDirectIO(fd, path);
 } else {
 this.alignment = -1;
 }
 //当parent不存在时,则注册一个cleaner,否则交由parent做清理动作。
 // Register a cleaning action if and only if there is no parent
 // as the parent will take care of closing the file descriptor.
 // FileChannel is used by the LambdaMetaFactory so a lambda cannot
 // be used here hence we use a nested class instead.
 this.closer = parent != null ? null :
 CleanerFactory.cleaner().register(this, new Closer(fd));
}
// Used by FileInputStream.getChannel(), FileOutputStream.getChannel
// and RandomAccessFile.getChannel()
public static FileChannel open(FileDescriptor fd, String path,
 boolean readable, boolean writable,
 boolean direct, Object parent)
{
 return new FileChannelImpl(fd, path, readable, writable, direct, parent);
}
 
  • open方法主要是返回一个新new的FileChannelImpl对象,初始化时设置fileDescriptor、readable、writable、append、parent、path等属性,看变量名很容易理解,在此不赘述变量含义。

read

//实现自SeekableByteChannel接口的方法,将文件中的内容读取到给定的byteBuffer中
public int read(ByteBuffer dst) throws IOException {
 //保证读写时,channel处于开启状态
 ensureOpen();
 
 //判断是否可读
 if (!readable)
 throw new NonReadableChannelException();
  synchronized  (positionLock) {
 if (direct)
 Util. check ChannelPositionAligned(position(), alignment);
 int n = 0;
 int ti = -1;
 try {
 
 //开始阻塞,并注册为Interruptible,可以被中断
 beginBlocking();
 
 //将当前 线程 添加到NativeThreadSet中,并返回索引,方便后续操作。
 //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号
 ti = threads.add();
 if (!isOpen())
 return 0;
 do {
 //当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中
 n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(n);
 } finally {
 //把当前线程从set中移出
 threads.remove(ti);
 
 //结束,释放锁
 endBlocking(n > 0);
 assert IOStatus.check(n);
 }
 }
}
//实现自ScatteringByteChannel接口的方法,将文件中的内容依次读取到给定的byteBuffer数组中。
public long read(ByteBuffer[] dsts, int offset, int length)
 throws IOException
{
 if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 throw new IndexOutOfBoundsException();
 //保证读写时,channel处于开启状态
 ensureOpen();
 
 //判断是否可读
 if (!readable)
 throw new NonReadableChannelException();
 synchronized (positionLock) {
 if (direct)
 Util.checkChannelPositionAligned(position(), alignment);
 long n = 0;
 int ti = -1;
 try {
 //开始阻塞,并注册为Interruptible,可以被中断
 beginBlocking();
 
 //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。
 //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号
 ti = threads.add();
 if (!isOpen())
 return 0;
 do {
 //当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中
 n = IOUtil.read(fd, dsts, offset, length,
 direct, alignment, nd);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(n);
 } finally {
 //把当前线程从set中移出
 threads.remove(ti);
 
 //结束,释放锁
 endBlocking(n > 0);
 assert IOStatus.check(n);
 }
 }
}
 

write

//实现自SeekableByteChannel接口的方法,将byteBuffer中的内容写入到文件中
public int write(ByteBuffer src) throws IOException {
 //保证写时,channel处于开启状态
 ensureOpen();
 
 //判断是否可写
 if (!writable)
 throw new NonWritableChannelException();
 synchronized (positionLock) {
 if (direct)
 Util.checkChannelPositionAligned(position(), alignment);
 int n = 0;
 int ti = -1;
 try {
 //开始阻塞,并注册为Interruptible,可以被中断
 beginBlocking();
 
 //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。
 //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号
 ti = threads.add();
 if (!isOpen())
 return 0;
 do {
 //当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中
 n = IOUtil.write(fd, src, -1, direct, alignment, nd);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(n);
 } finally {
 //把当前线程从set中移出
 threads.remove(ti);
 
 //结束,释放锁
 assert IOStatus.check(n);
 }
 }
}
//实现自GatheringByteChannel接口的方法,将byteBuffer数组中的内容依次写入到文件中
public long write(ByteBuffer[] srcs, int offset, int length)
 throws IOException
{
 if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 throw new IndexOutOfBoundsException();
 //保证写时,channel处于开启状态
 ensureOpen();
 
 //判断是否可写
 if (!writable)
 throw new NonWritableChannelException();
 synchronized (positionLock) {
 if (direct)
 Util.checkChannelPositionAligned(position(), alignment);
 long n = 0;
 int ti = -1;
 try {
 //开始阻塞,并注册为Interruptible,可以被中断
 beginBlocking();
 
 //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。
 //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号
 ti = threads.add();
 if (!isOpen())
 return 0;
 do {
 //当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中
 n = IOUtil.write(fd, srcs, offset, length,
 direct, alignment, nd);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(n);
 } finally {
 //把当前线程从set中移出
 threads.remove(ti);
 
 //结束,释放锁
 assert IOStatus.check(n);
 }
 }
}
 

position

//实现自SeekableByteChannel接口的方法,获取当前channel的position
public long position() throws IOException {
 ensureOpen();
 synchronized (positionLock) {
 long p = -1;
 int ti = -1;
 try {
 beginBlocking();
 ti = threads.add();
 if (!isOpen())
 return 0;
 boolean append = fdAccess.getAppend(fd);
 do {
 //append模式下,position在channel的末尾
 // in append-mode then position is advanced to end before writing
 p = (append) ? nd.size(fd) : nd.seek(fd, -1);
 } while ((p == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(p);
 } finally {
 threads.remove(ti);
 endBlocking(p > -1);
 assert IOStatus.check(p);
 }
 }
}
//实现自SeekableByteChannel接口的方法,设置当前channel的position为newPosition
public FileChannel position(long newPosition) throws IOException {
 ensureOpen();
 if (newPosition < 0)
 throw new IllegalArgumentException();
 synchronized (positionLock) {
 long p = -1;
 int ti = -1;
 try {
 beginBlocking();
 ti = threads.add();
 if (!isOpen())
 return null;
 do {
 //设置当前position为newPosition
 p = nd.seek(fd, newPosition);
 } while ((p == IOStatus.INTERRUPTED) && isOpen());
 return this;
 } finally {
 threads.remove(ti);
 endBlocking(p > -1);
 assert IOStatus.check(p);
 }
 }
}
 

size

实现自SeekableByteChannel接口的方法,返回当前实体(文件)的大小

truncate

实现自SeekableByteChannel接口的方法,用来截取文件至newSize大小

force

实现自SeekableByteChannel接口的方法,用来将channel中尚未写入磁盘的数据强制落盘

transferTo

将fileChannel中的数据传递至另一个channel

transferFrom

从其它channel读取数据至fileChannel

SocketChannel

open

/**
 * Opens a socket channel.
 *
 * <p> The new channel is created by invoking the {@link
 * java.nio.channels.spi.SelectorProvider#openSocketChannel
 * openSocketChannel} method of the system-wide default {@link
 * java.nio.channels.spi.SelectorProvider} object. </p>
 *
 * @return A new socket channel
 *
 * @throws IOException
 * If an I/O error occurs
 */public static SocketChannel open() throws IOException {
 return SelectorProvider.provider().openSocketChannel();
}
 

open方法是调用SelectorProvider中实现了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底层实际是new SocketChannelImpl,调用native方法创建socket

connect

public boolean connect(SocketAddress sa) throws IOException {
 //校验Address是否合法
 InetSocketAddress  isa  = Net.checkAddress(sa);
 
 //获取系统安全管理器
 SecurityManager sm = System.getSecurityManager();
 if (sm != null)
 
 //校验IP和端口是否被允许连接
 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 InetAddress ia = isa.getAddress();
 
 //如果是本机地址,则获取本机的host
 if (ia.isAnyLocalAddress())
 ia = InetAddress.getLocalHost();
 try {
 //加读锁
 readLock.lock();
 try {
 //加写锁
 writeLock.lock();
 try {
 int n = 0;
 
 //是否阻塞
 boolean blocking = isBlocking();
 try {
 //开启connect前的校验并设置为ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。
 beginConnect(blocking, isa);
 do {
 //调用native connect方法
 n = Net.connect(fd, ia, isa.getPort());
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } finally {
 //结束连接
 endConnect(blocking, (n > 0));
 }
 assert IOStatus.check(n);
 return n > 0;
 } finally {
 //释放写锁
 writeLock. unlock ();
 }
 } finally {
 //释放读锁
 readLock.unlock();
 }
 } catch (IOException ioe) {
 // connect failed, close the channel
 close();
 throw SocketExceptions.of(ioe, isa);
 }
}
 

configureBlocking

实现自SelectableChannel的接口方法,调用native方法设置socket的阻塞状态

register

在AbstractSelectableChannel中定义,注册要监听的事件。

public final SelectionKey register(Selector sel, int ops, Object att)
 throws ClosedChannelException
{
 if ((ops & ~validOps()) != 0)
 throw new IllegalArgumentException();
 if (!isOpen())
 throw new ClosedChannelException();
 synchronized (regLock) {
 if (isBlocking())
 throw new IllegalBlockingModeException();
 synchronized (keyLock) {
 // re-check if channel has been closed
 if (!isOpen())
 throw new ClosedChannelException();
 SelectionKey k = findKey(sel);
 if (k != null) {
 k.attach(att);
 k.interestOps(ops);
 } else {
 // 向Selector中注册事件
 // New registration
 k = ((AbstractSelector)sel).register(this, ops, att);
 addKey(k);
 }
 return k;
 }
 }
}
 

read

//实现自ReadableByteChannel接口的方法,从socket中读取数据至ByteBuffer
@Override
public int read(ByteBuffer buf) throws IOException {
 Objects.requireNonNull(buf);
 readLock.lock();
 try {
 boolean blocking = isBlocking();
 int n = 0;
 try {
 //检查channel是否开启并已经是connected的状态。如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。
 beginRead(blocking);
 // check if input is shutdown
 if (isInputClosed)
 return IOStatus.EOF;
 //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。
 if (blocking) {
 do {
 n = IOUtil.read(fd, buf, -1, nd);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } else {
 n = IOUtil.read(fd, buf, -1, nd);
 }
 } finally {
 endRead(blocking, n > 0);
 if (n <= 0 && isInputClosed)
 return IOStatus.EOF;
 }
 return IOStatus.normalize(n);
 } finally {
 readLock.unlock();
 }
}
//实现自ScatteringByteChannel接口的方法,从socket中依次读取数据至ByteBuffer数组
@Override
public long read(ByteBuffer[] dsts, int offset, int length)
 throws IOException
{
 Objects.checkFromIndexSize(offset, length, dsts.length);
 readLock.lock();
 try {
 boolean blocking = isBlocking();
 long n = 0;
 try {
 beginRead(blocking);
 // check if input is shutdown
 if (isInputClosed)
 return IOStatus.EOF;
 //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。
 if (blocking) {
 do {
 n = IOUtil.read(fd, dsts, offset, length, nd);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } else {
 n = IOUtil.read(fd, dsts, offset, length, nd);
 }
 } finally {
 endRead(blocking, n > 0);
 if (n <= 0 && isInputClosed)
 return IOStatus.EOF;
 }
 return IOStatus.normalize(n);
 } finally {
 readLock.unlock();
 }
}
 

write

//实现自ReadableByteChannel接口的方法,将ByteBuffer中的数据写入socket
@Override
public int write(ByteBuffer buf) throws IOException {
 Objects.requireNonNull(buf);
 writeLock.lock();
 try {
 boolean blocking = isBlocking();
 int n = 0;
 try {
 beginWrite(blocking);
 //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。
 if (blocking) {
 do {
 n = IOUtil.write(fd, buf, -1, nd);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } else {
 n = IOUtil.write(fd, buf, -1, nd);
 }
 } finally {
 endWrite(blocking, n > 0);
 if (n <= 0 && isOutputClosed)
 throw new AsynchronousCloseException();
 }
 return IOStatus.normalize(n);
 } finally {
 writeLock.unlock();
 }
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length)
 throws IOException
{
 Objects.checkFromIndexSize(offset, length, srcs.length);
 writeLock.lock();
 try {
 boolean blocking = isBlocking();
 long n = 0;
 try {
 beginWrite(blocking);
 //如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。
 if (blocking) {
 do {
 n = IOUtil.write(fd, srcs, offset, length, nd);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } else {
 n = IOUtil.write(fd, srcs, offset, length, nd);
 }
 } finally {
 endWrite(blocking, n > 0);
 if (n <= 0 && isOutputClosed)
 throw new AsynchronousCloseException();
 }
 return IOStatus.normalize(n);
 } finally {
 writeLock.unlock();
 }
}
//实现自ReadableByteChannel接口的方法,将ByteBuffer数组中的数据依次写入socket
/**
 * Writes a byte of out of band data.
 */int sendOutOfBandData(byte b) throws IOException {
 writeLock.lock();
 try {
 boolean blocking = isBlocking();
 int n = 0;
 try {
 beginWrite(blocking);
 //如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。
 if (blocking) {
 do {
 n = sendOutOfBandData(fd, b);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } else {
 n = sendOutOfBandData(fd, b);
 }
 } finally {
 endWrite(blocking, n > 0);
 if (n <= 0 && isOutputClosed)
 throw new AsynchronousCloseException();
 }
 return IOStatus.normalize(n);
 } finally {
 writeLock.unlock();
 }
}
 

ServerSocketChannel

socket

@Override
public ServerSocket socket() {
 synchronized (stateLock) {
 if (socket == null)
 socket = ServerSocketAdaptor.create(this);
 return socket;
 }
}
 

bind

@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
 synchronized (stateLock) {
 ensureOpen();
 if (localAddress != null)
 throw new AlreadyBoundException();
 InetSocketAddress isa = (local == null)
 ? new InetSocketAddress(0)
 : Net.checkAddress(local);
 SecurityManager sm = System.getSecurityManager();
 if (sm != null)
 sm.checkListen(isa.getPort());
 
 //绑定前做一些前置处理,如将tcp socket文件描述符转换成SDP
 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
 
 //绑定IP和地址
 Net.bind(fd, isa.getAddress(), isa.getPort());
 
 //开始监听,设置socket上最多可以挂起backlog个连接,若backlog小于1 则默认设置50个
 Net.listen(fd, backlog < 1 ? 50 : backlog);
 
 localAddress = Net.localAddress(fd);
 }
 return this;
}
 

accept

@Override
public SocketChannel accept() throws IOException {
 acceptLock.lock();
 try {
 int n = 0;
 FileDescriptor newfd = new FileDescriptor();
 InetSocketAddress[] isaa = new InetSocketAddress[1];
 boolean blocking = isBlocking();
 try {
 begin(blocking);
 do {
 //阻塞等待接收客户端链接
 n = accept(this.fd, newfd, isaa);
 } while (n == IOStatus.INTERRUPTED && isOpen());
 } finally {
 end(blocking, n > 0);
 assert IOStatus.check(n);
 }
 if (n < 1)
 return null;
 //新接收的socket初始设置为阻塞模式(因此非阻塞模式的每次需要显示设置)
 // newly accepted socket is initially in blocking mode
 IOUtil.configureBlocking(newfd, true);
 InetSocketAddress isa = isaa[0];
 
 //用新接收的socket创建SocketChannel
 SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
 // check permitted to accept connections from the remote address
 SecurityManager sm = System.getSecurityManager();
 if (sm != null) {
 try {
 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
 } catch (SecurityException x) {
 sc.close();
 throw x;
 }
 }
 return sc;
 } finally {
 acceptLock.unlock();
 }
}
 

ServerSocketChannel并没有read和write方法,只是继承了AbstractSelectableChannel,以便在selector中使用

DatagramChannel

open

public DatagramChannelImpl(SelectorProvider sp)
 throws IOException
{
 super(sp);
 ResourceManager.beforeUdpCreate();
 try {
 //如果不支持IPv6则使用IPv4
 this.family = Net.isIPv6Available()
 ? StandardProtocolFamily.INET6
 : StandardProtocolFamily.INET;
 
 //设置非流式的socket(tcp是流模式协议,udp是数据报模式协议)
 this.fd = Net.socket(family, false);
 this.fdVal = IOUtil.fdVal(fd);
 } catch (IOException ioe) {
 ResourceManager.afterUdpClose();
 throw ioe;
 }
}
 

receive

public SocketAddress receive(ByteBuffer dst) throws IOException {
 if (dst.isReadOnly())
 throw new IllegalArgumentException("Read-only buffer");
 readLock.lock();
 try {
 boolean blocking = isBlocking();
 int n = 0;
 ByteBuffer bb = null;
 try {
 SocketAddress remote = beginRead(blocking, false);
 boolean connected = (remote != null);
 SecurityManager sm = System.getSecurityManager();
 if (connected || (sm == null)) {
 // connected or no security manager
 do {
 n = receive(fd, dst, connected);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 if (n == IOStatus.UNAVAILABLE)
 return null;
 } else {
 // Cannot receive into user's buffer when running with a
 // security manager and not connected
 bb = Util.getTemporaryDirectBuffer(dst.remaining());
 for (;;) {
 do {
 n = receive(fd, bb, connected);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 if (n == IOStatus.UNAVAILABLE)
 return null;
 InetSocketAddress isa = (InetSocketAddress)sender;
 try {
 sm.checkAccept(isa.getAddress().getHostAddress(),
 isa.getPort());
 } catch (SecurityException se) {
 // Ignore packet
 bb.clear();
 n = 0;
 continue;
 }
 bb.flip();
 dst.put(bb);
 break;
 }
 }
 
 //sender:发送方地址, Set by receive0 (## ugh)
 assert sender != null;
 return sender;
 } finally {
 if (bb != null)
 Util.releaseTemporaryDirectBuffer(bb);
 endRead(blocking, n > 0);
 assert IOStatus.check(n);
 }
 } finally {
 readLock.unlock();
 }
}
 

send

public int send(ByteBuffer src, SocketAddress target)
 throws IOException
{
 Objects.requireNonNull(src);
 InetSocketAddress isa = Net.checkAddress(target, family);
 writeLock.lock();
 try {
 boolean blocking = isBlocking();
 int n = 0;
 try {
 //当connect后,remote会设置为连接的地址
 SocketAddress remote = beginWrite(blocking, false);
 if (remote != null) {
 // connected
 if (!target.equals(remote)) {
 throw new AlreadyConnectedException();
 }
 do {
 n = IOUtil.write(fd, src, -1, nd);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 } else {
 // not connected
 SecurityManager sm = System.getSecurityManager();
 if (sm != null) {
 InetAddress ia = isa.getAddress();
 if (ia.isMulticastAddress()) {
 sm.checkMulticast(ia);
 } else {
 sm.checkConnect(ia.getHostAddress(), isa.getPort());
 }
 }
 do {
 n = send(fd, src, isa);
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 }
 } finally {
 endWrite(blocking, n > 0);
 assert IOStatus.check(n);
 }
 return IOStatus.normalize(n);
 } finally {
 writeLock.unlock();
 }
}
 

connect

@Override
public DatagramChannel connect(SocketAddress sa) throws IOException {
 InetSocketAddress isa = Net.checkAddress(sa, family);
 SecurityManager sm = System.getSecurityManager();
 if (sm != null) {
 InetAddress ia = isa.getAddress();
 if (ia.isMulticastAddress()) {
 sm.checkMulticast(ia);
 } else {
 sm.checkConnect(ia.getHostAddress(), isa.getPort());
 sm.checkAccept(ia.getHostAddress(), isa.getPort());
 }
 }
 readLock.lock();
 try {
 writeLock.lock();
 try {
 synchronized (stateLock) {
 ensureOpen();
 if (state == ST_CONNECTED)
 throw new AlreadyConnectedException();
 int n = Net.connect(family,
 fd,
 isa.getAddress(),
 isa.getPort());
 if (n <= 0)
 throw new Error(); // Can't happen
 // connected
 remoteAddress = isa;
 state = ST_CONNECTED;
 // refresh local address
 localAddress = Net.localAddress(fd);
 // flush any packets already received.
 boolean blocking = isBlocking();
 if (blocking) {
 IOUtil.configureBlocking(fd, false);
 }
 try {
 ByteBuffer buf = ByteBuffer.allocate(100);
 while (receive(buf) != null) {
 buf.clear();
 }
 } finally {
 if (blocking) {
 IOUtil.configureBlocking(fd, true);
 }
 }
 }
 } finally {
 writeLock.unlock();
 }
 } finally {
 readLock.unlock();
 }
 return this;
}
 

udp是数据报模式的协议,是没有connect的。这里的connect实际上是在底层忽略了与其他地址的数据传输。

在connect后,就可以像socketChannel似得使用read和write了

总结

本文学习了各种channel的实现,主要是对底层native方法的一些封装,针对不同属性的实体(文件、socket),使用对应的channel与byteBuffer传输数据。再通过byteBuffer与byte数据进行转换。

channel的实现中,封装了大量的native方法,重要的底层实现全在native中,后续可以深入学习下。

本文中出现的byteBuffer和selector将在接下来的文章中,单独分析。

作者:aloof_

文章来源:智云一二三科技

文章标题:「从入门到放弃-Java」并发编程-NIO-Channel

文章地址:https://www.zhihuclub.com/183506.shtml

关于作者: 智云科技

热门文章

网站地图