说明:
在Reactor经典模型中,Reactor查询到NIO就绪的事件后,分发到Handler,由Handler完成NIO操作和计算的操作。
ChannelHandler,处理一个I/O event或者拦截一个I/O操作,在它的pipeline中将其递交给相邻的下一个handler,该接口有许多的方法需要实现,一般可通过继承ChannelHandlerAdapter来代替。
package io.netty.channel;
public interface ChannelInboundHandler extends ChannelHandler {
// 注册事件
void channelRegistered(ChannelHandlerContext var1) throws Exception;
//
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
//
void channelActive(ChannelHandlerContext var1) throws Exception;
//
void channelInactive(ChannelHandlerContext var1) throws Exception;
//
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
//
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
//
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
//
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
//
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
//
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
//
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
//
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
//
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
//
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
//
void read(ChannelHandlerContext var1) throws Exception;
//
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
//
void flush(ChannelHandlerContext var1) throws Exception;
}
代码案例(传输字节):
package com.what21.netty.channel.bytes;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServerDemo {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
server.group(parentGroup, childGroup);
server.option(ChannelOption.SO_BACKLOG, 128);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SocketBytesHandler());
}
});
ChannelFuture future = server.bind(11112).sync();
future.channel().closeFuture().sync();
}
}
package com.what21.netty.channel.bytes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SocketBytesHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("1、handlerAdded()");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("2、channelRegistered()");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("3、channelActive()");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("4、channelRead()");
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("byteBuf=" + byteBuf);
int requestLength = byteBuf.readByte();
byte[] bytes = new byte[requestLength];
byteBuf.readBytes(bytes);
System.out.println("接收字符串:" + new String(bytes));
// 写数据到客户端
String response = "我是服务器端,你要怎么滴?";
byte[] responseBytes = response.getBytes();
ByteBuf responseByteBuf = Unpooled.copiedBuffer(responseBytes);
byte[] lengthBytes = {(byte) responseBytes.length};
ByteBuf resLengByteBuf = Unpooled.copiedBuffer(lengthBytes);
ctx.write(resLengByteBuf);
ctx.writeAndFlush(responseByteBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("5、channelReadComplete()");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("6、channelInactive()");
super.channelInactive(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("7、handlerRemoved()");
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught()");
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered()");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged()");
super.channelWritabilityChanged(ctx);
}
}
package com.what21.netty.channel.bytes;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@Slf4j
public class SocketClientDemo {
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 11111;
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
socket.setKeepAlive(true);
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
// 客户端发送消息
String msg = "我是客户端!";
byte[] requestBytes = msg.getBytes();
out.write(requestBytes.length);
out.write(requestBytes);
out.flush();
// 客户端接收消息
int length = in.read();
byte[] bytes = new byte[length];
in.read(bytes);
System.out.println("响应消息为:" + new String(bytes));
out.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
代码案例(编码后传输字节):
package com.what21.netty.channel.bytes2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class NettyServerDemo {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
server.group(parentGroup, childGroup);
server.option(ChannelOption.SO_BACKLOG, 128);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 解码器自定义长度(ChannelInboundHandler)
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// ChannelOutboundHandler
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast(new SocketByteHandler());
}
});
ChannelFuture future = server.bind(11112).sync();
future.channel().closeFuture().sync();
}
}
package com.what21.netty.channel.bytes2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class SocketByteHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println("Client said:" + resultStr);
// 释放资源,这行很关键
result.release();
String response = "I am ok!";
// 在当前场景下,发送的数据必须转换成ByteBuf数组
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.flush();
}
}
package com.what21.netty.channel.bytes2;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@Slf4j
public class SocketClientDemo {
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 11112;
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
socket.setKeepAlive(true);
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
// 客户端发送消息
ByteBuffer header = ByteBuffer.allocate(4);
String msg = "我是客户端!";
byte[] msgBytes = msg.getBytes();
header.putInt(msgBytes.length);
out.write(header.array());
out.write(msgBytes);
out.flush();
byte[] responseLengthBytes = new byte[4];
int readI = in.read(responseLengthBytes);
System.out.println("已读长度为:" + readI);
// 客户端接收消息
byte[] responseContentBytes = new byte[1024];
int readed = in.read(responseContentBytes);
if (readed > 0) {
System.out.println("已读长度为:" + readed);
String responseContent = new String(responseContentBytes, 0, readed);
System.out.println("服务器端响应内容:" + responseContent);
}
out.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}