您的位置 首页 java

Java,Netty,Socket客户端与Netty服务器端通信,只传输字节案例

说明:

在Reactor经典模型中,Reactor查询到NIO就绪的事件后,分发到Handler,由Handler完成NIO操作和计算的操作。

ChannelHandler,处理一个I/O event或者拦截一个I/O操作,在它的pipeline中将其递交给相邻的下一个handler,该接口有许多的方法需要实现,一般可通过继承ChannelHandlerAdapter来代替。

 package io.netty.channel;

public interface ChannelInboundHandler extends ChannelHandler {

    // 注册事件
    void channelRegistered(ChannelHandlerContext var1) throws Exception;
    //
    void channelUnregistered(ChannelHandlerContext var1) throws Exception;
    //
    void channelActive(ChannelHandlerContext var1) throws Exception;
    //
    void channelInactive(ChannelHandlerContext var1) throws Exception;
    //
    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
    //
    void channelReadComplete(ChannelHandlerContext var1) throws Exception;
    //
    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
    //
    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
    //
    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

}  
 package io.netty.channel;

import java.net.SocketAddress;

public interface ChannelOutboundHandler extends ChannelHandler {
    //
    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
    //
    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
    //
    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    //
    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    //
    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    //
    void read(ChannelHandlerContext var1) throws Exception;
    //
    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
    //
    void flush(ChannelHandlerContext var1) throws Exception;
}  

代码案例(传输字节):

 package com.what21.netty.channel.bytes;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServerDemo {

    public static void main(String[] args) throws InterruptedException {
        ServerBootstrap server = new ServerBootstrap();
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        server.group(parentGroup, childGroup);
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new SocketBytesHandler());
            }
        });
        ChannelFuture future = server.bind(11112).sync();
        future.channel().closeFuture().sync();
    }

}  
 package com.what21.netty.channel.bytes;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SocketBytesHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("1、handlerAdded()");
        super.handlerAdded(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("2、channelRegistered()");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("3、channelActive()");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("4、channelRead()");
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("byteBuf=" + byteBuf);
        int requestLength = byteBuf.readByte();
        byte[] bytes = new byte[requestLength];
        byteBuf.readBytes(bytes);
        System.out.println("接收字符串:" + new String(bytes));
        // 写数据到客户端
        String response = "我是服务器端,你要怎么滴?";
        byte[] responseBytes = response.getBytes();
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(responseBytes);
        byte[] lengthBytes = {(byte) responseBytes.length};
        ByteBuf resLengByteBuf = Unpooled.copiedBuffer(lengthBytes);
        ctx.write(resLengByteBuf);
        ctx.writeAndFlush(responseByteBuf);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("5、channelReadComplete()");
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("6、channelInactive()");
        super.channelInactive(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("7、handlerRemoved()");
        super.handlerRemoved(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught()");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("userEventTriggered()");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelWritabilityChanged()");
        super.channelWritabilityChanged(ctx);
    }

}  
 package com.what21.netty.channel.bytes;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;

@Slf4j
public class SocketClientDemo {

    public static void main(String[] args) {
        String host = "127.0.0.1";
        int port = 11111;
        try {
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress(host, port));
            socket.setKeepAlive(true);
            OutputStream out = socket.getOutputStream();
            InputStream in = socket.getInputStream();
            // 客户端发送消息
            String msg = "我是客户端!";
            byte[] requestBytes = msg.getBytes();
            out.write(requestBytes.length);
            out.write(requestBytes);
            out.flush();
            // 客户端接收消息
            int length = in.read();
            byte[] bytes = new byte[length];
            in.read(bytes);
            System.out.println("响应消息为:" + new String(bytes));
            out.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}  

代码案例(编码后传输字节):

 package com.what21.netty.channel.bytes2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

public class NettyServerDemo {

    public static void main(String[] args) throws InterruptedException {
        ServerBootstrap server = new ServerBootstrap();
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        server.group(parentGroup, childGroup);
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 解码器自定义长度(ChannelInboundHandler)
                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                // ChannelOutboundHandler
                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                pipeline.addLast(new SocketByteHandler());
            }
        });
        ChannelFuture future = server.bind(11112).sync();
        future.channel().closeFuture().sync();
    }

}  
 package com.what21.netty.channel.bytes2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketByteHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中  
        result.readBytes(result1);
        String resultStr = new String(result1);
        System.out.println("Client said:" + resultStr);
        // 释放资源,这行很关键  
        result.release();
        String response = "I am ok!";
        // 在当前场景下,发送的数据必须转换成ByteBuf数组  
        ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
        encoded.writeBytes(response.getBytes());
        ctx.write(encoded);
        ctx.flush();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        ctx.flush();
    }

}  
 package com.what21.netty.channel.bytes2;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;

@Slf4j
public class SocketClientDemo {

    public static void main(String[] args) {
        String host = "127.0.0.1";
        int port = 11112;
        try {
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress(host, port));
            socket.setKeepAlive(true);
            OutputStream out = socket.getOutputStream();
            InputStream in = socket.getInputStream();
            // 客户端发送消息
            ByteBuffer header = ByteBuffer.allocate(4);
            String msg = "我是客户端!";
            byte[] msgBytes = msg.getBytes();
            header.putInt(msgBytes.length);
            out.write(header.array());
            out.write(msgBytes);
            out.flush();
            byte[] responseLengthBytes = new byte[4];
            int readI = in.read(responseLengthBytes);
            System.out.println("已读长度为:" + readI);
            // 客户端接收消息
            byte[] responseContentBytes = new byte[1024];
            int readed = in.read(responseContentBytes);
            if (readed > 0) {
                System.out.println("已读长度为:" + readed);
                String responseContent = new String(responseContentBytes, 0, readed);
                System.out.println("服务器端响应内容:" + responseContent);
            }
            out.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}  

文章来源:智云一二三科技

文章标题:Java,Netty,Socket客户端与Netty服务器端通信,只传输字节案例

文章地址:https://www.zhihuclub.com/178805.shtml

关于作者: 智云科技

热门文章

网站地图