您的位置 首页 java

Java,SpringBoot,Netty,HttpClient,实现代理,请求转发功能

背景说明

开发环境:idea + SpringBoot2.3.3.RELEASE + netty4.1.60.Final + httpclient4.5

实现目标:

实现代码:

 <!--  netty -all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.60.Final</version>
</dependency>
<!-- protobuf-java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.15.6</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpmime</artifactId>
    <version>4.5.13</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpasyncclient</artifactId>
    <version>4.1</version>
</dependency>  

application.yml

 gatekeeper:
  server:
    local-addr: 0.0.0.0
    local-port: 80
  proxy:
    remote-addr: www.baidu.com
    remote-port: 80

# 日志输出
logging:
  level:
    root: info
    com:
      gatekeeper: debug
  pattern:
    console: '%clr(%d{yyyy-MM-dd} [%thread] %-5level %logger{50} - %msg%n)'
    file: '%d{yyyy-MM-dd HH:mm:ss.SSS} >>> [%thread] >>> %-5level >>> %logger{50} >>> %msg%n'  

配置:

 package com.gatekeeper.configure;

import com.gatekeeper.common. applicationContext Helper;
import com.gatekeeper.server.GatekeeperHttpConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.context. annotation .Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class GatekeeperConfigure {

    @Bean(name = "gatekeeperHttpConfig")
    public GatekeeperHttpConfig gatekeeperHttpConfig() {
        return new GatekeeperHttpConfig();
    }

    @Bean(name = "applicationContextHelper")
    public ApplicationContextHelper applicationContextHelper(ApplicationContext applicationContext) {
        return new ApplicationContextHelper(applicationContext);
    }

}
  
 package com.gatekeeper.common;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util. Map ;

public class ApplicationContextHelper implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    public ApplicationContextHelper(ApplicationContext applicationContext) {
        this.setApplicationContext(applicationContext);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ApplicationContextHelper.applicationContext = applicationContext;
    }

    /**
     * 根据Bean的名称获取bean对象
     *
     * @param beanName
     * @param <T>
     * @return
     */
    public static <T> T getBean(String beanName) {
        if (applicationContext.containsBean(beanName)) {
            return (T) applicationContext.getBean(beanName);
        } else {
            return null;
        }
    }

    public static <T> Map<String, T> getBeansOfType(Class<T> baseType) {
        return applicationContext.getBeansOfType(baseType);
    }

}
  
 package com.gatekeeper.server;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;

@Data
public class GatekeeperHttpConfig {

    @Value("${gatekeeper.server.local-addr}")
    private String localAddr;

    @Value("${gatekeeper.server.local-port}")
    private int localPort;

    @Value("${gatekeeper.proxy.remote-addr}")
    private String remoteAddr;

    @Value("${gatekeeper.proxy.remote-port}")
    private int remotePort;

}
  

服务类:

 package com.gatekeeper.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty. channel .ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern. slf4j .Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

@Component
@Slf4j
@Order(value = 1)
public class GatekeeperHttpServer implements ApplicationRunner, DisposableBean {

    @Autowired
    private GatekeeperHttpConfig gatekeeperHttpConfig;
    private Thread serverThread;
    private EventLoopGroup bossEventGroup;
    private NioEventLoopGroup workerEventGroup;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.debug("run", args);
        serverThread = new Thread(() -> {
            try {
                // 初始化==>用于Acceptor的主"线程池"
                this.bossEventGroup = new NioEventLoopGroup();
                // 初始化==>用于I/O工作的从"线程池"
                this.workerEventGroup = new NioEventLoopGroup();
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // group方法设置主从线程池
                serverBootstrap.group(bossEventGroup, workerEventGroup);
                // 指定通道channel类型,服务端为:NioServerSocketChannel
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.childHandler(new GatekeeperHttpServerInitializer());
                SocketAddress serverAddr = new InetSocketAddress(gatekeeperHttpConfig.getLocalAddr(), gatekeeperHttpConfig.getLocalPort());
                // 绑定并侦听端口
                ChannelFuture channelFuture = serverBootstrap.bind(serverAddr).sync();
                log.debug("server:" + serverAddr + " start");
                // 等待服务监听端口关闭
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 优雅退出,释放"线程池"
                bossEventGroup.shutdownGracefully();
                workerEventGroup.shutdownGracefully();
            }
        });
        serverThread.setName("gatekeeper-server-thread");
        serverThread.setDaemon(true);
        serverThread.start();
    }

    @Override
    public void destroy() throws Exception {
        if (bossEventGroup != null) {
            bossEventGroup.shutdownGracefully();
        }
        if (workerEventGroup != null) {
            workerEventGroup.shutdownGracefully();
        }
    }

}  
 package com.gatekeeper.server;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.util.Map;

/**
 * netty 实现简单的 http 协议:配置 解码器、handler
 */
public class GatekeeperHttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // http编解码
        pipeline.addLast("http-decoder", new HttpRequestDecoder());
        // http消息聚合器
        pipeline.addLast("http-aggregator", new HttpObjectAggregator(65535));

        pipeline.addLast("http-encoder", new HttpResponseEncoder());
        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
        // 请求处理器
        pipeline.addLast("MyNettyHttpServerHandler", new GatekeeperHttpRequestHandler());
        // 打印
        Map<String, ChannelHandler> handlerMap = pipeline.toMap();
        for (String key : handlerMap.keySet()) {
            // System.out.println(key + "=" + handlerMap.get(key));
        }
    }

}  
 package com.gatekeeper.server;

import com.gatekeeper.common.ApplicationContextHelper;
import com.gatekeeper. exchange .GatekeeperExchanger;
import com.gatekeeper.exchange.HttpExchange;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;

@Slf4j
public class GatekeeperHttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private GatekeeperHttpConfig catekeeperHttpConfig;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        log.debug("channelRead0() fullHttpRequest-->" + fullHttpRequest);
        SocketAddress clientSocketAddress = channelHandlerContext.channel().remoteAddress();
        HttpResponseStatus responseStatus = HttpResponseStatus.OK;
        FullHttpResponse fullHttpResponse = null;
        if (fullHttpRequest.method() == HttpMethod.GET) {
            HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest);
            httpExchange.setClientSocketAddress(clientSocketAddress);
            fullHttpResponse = GatekeeperExchanger.exchange(httpExchange);
        } else if (fullHttpRequest.method() == HttpMethod.POST) {
            HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest);
            httpExchange.setClientSocketAddress(clientSocketAddress);
            fullHttpResponse = GatekeeperExchanger.exchange(httpExchange);
        } else {
            responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
            fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
        }
        if (fullHttpResponse != null) {
            log.debug("channelRead0() fullHttpResponse-->" + fullHttpResponse);
            channelHandlerContext.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * @return
     */
    public GatekeeperHttpConfig getCatekeeperHttpConfig() {
        if (catekeeperHttpConfig == null) {
            synchronized (this) {
                catekeeperHttpConfig = ApplicationContextHelper.getBean("gatekeeperHttpConfig");
            }
        }
        return catekeeperHttpConfig;
    }

}  

信息交换:

 package com.gatekeeper.exchange;

import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.util.StringUtils;

public final class GatekeeperExchanger {

    /**
     * @param httpExchange
     * @return
     */
    public static FullHttpResponse exchange(HttpExchange httpExchange) {
        HttpMethod httpMethod = httpExchange.getFullHttpRequest().method();
        String strContentType = getRequestContentType(httpExchange);
        if (HttpMethod.GET == httpMethod) {
            GetRemoteExecutor.INSTANCE.execute(httpExchange);
        } else if (HttpMethod.POST == httpMethod) {
            if (isRequestBodyStringStream(strContentType, httpExchange)) {
                PostStringStreamExecutor.INSTANCE.execute(httpExchange);
            } else if (isRequestBodyFormData(strContentType, httpExchange)) {
                Post byte StreamExecutor.INSTANCE.execute(httpExchange);
            } else {
                PostStringStreamExecutor.INSTANCE.execute(httpExchange);
            }
        }
        return httpExchange.getFullHttpResponse();
    }

    /**
     * @param strContentType
     * @param httpExchange
     * @return
     */
    private static boolean isRequestBodyFormData(String strContentType, HttpExchange httpExchange) {
        if ("multipart/form-data".equalsIgnoreCase(strContentType)) {
            // 上传
            return true;
        }
        return false;
    }

    /**
     * @param strContentType
     * @param httpExchange
     * @return
     */
    private static boolean isRequestBodyStringStream(String strContentType, HttpExchange httpExchange) {
        if ("application/x-www-form-urlencoded".equalsIgnoreCase(strContentType)) {
            // name1=value1&name2=value2
            return true;
        } else if ("application/json".equalsIgnoreCase(strContentType)) {
            // {name1:value1,name2:value2}
            return true;
        } else if ("text/xml".equalsIgnoreCase(strContentType)) {
            // <xml><xml>
            return true;
        } else if ("application/xml".equalsIgnoreCase(strContentType)) {
            // <xml><xml>
            return true;
        }
        return false;
    }

    /**
     * @param httpExchange
     * @return
     */
    private static String getRequestContentType(HttpExchange httpExchange) {
        String strContentType = httpExchange.getHeaderValueFromRequest("Content-type");
        if (!StringUtils.isEmpty(strContentType)) {
            strContentType = strContentType.trim();
        }
        return strContentType;
    }

}
  
 package com.gatekeeper.exchange;

import com.gatekeeper.server.GatekeeperHttpConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@Data
@Slf4j
public final class HttpExchange {

    private GatekeeperHttpConfig catekeeperHttpConfig;

    private ChannelHandlerContext channelHandlerContext;

    private FullHttpRequest fullHttpRequest;

    private FullHttpResponse fullHttpResponse;

    private SocketAddress clientSocketAddress;

    public HttpExchange(GatekeeperHttpConfig catekeeperHttpConfig, ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        this.catekeeperHttpConfig = catekeeperHttpConfig;
        this.channelHandlerContext = channelHandlerContext;
        this.fullHttpRequest = fullHttpRequest;
    }

    /**
     * @param clientSocketAddress
     */
    public void setClientSocketAddress(SocketAddress clientSocketAddress) {
        this.clientSocketAddress = clientSocketAddress;
    }

    /**
     * @return
     */
    public SocketAddress getClientSocketAddress() {
        return this.clientSocketAddress;
    }

    /**
     * @return
     */
    public String getClientAddr() {
        return this.clientSocketAddress.toString();
    }

    /**
     * 远程执行地址
     *
     * @return
     */
    public String getRemoteUrl() {
        String url = "#34; + catekeeperHttpConfig.getRemoteAddr() + ":" + catekeeperHttpConfig.getRemotePort();
        url = url + fullHttpRequest.uri();
        log.debug("remote url " + url);
        return url;
    }

    /**
     * 从URI中获取请求参数
     *
     * @return
     */
    public Map<String, List<String>> getUriParamMapFromRequest() {
        Map<String, List<String>> params = new HashMap<>();
        QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
        Map<String, List<String>> paramList = decoder.parameters();
        for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
            params.put(entry.getKey(), entry.getValue());
        }
        return params;
    }

    /**
     * 从请求中获取头信息
     *
     * @param name
     * @return
     */
    public String getHeaderValueFromRequest(String name) {
        HttpHeaders httpHeaders = fullHttpRequest.headers();
        if (!httpHeaders.isEmpty()) {
            return httpHeaders.get(name);
        }
        return null;
    }

    /**
     * 从请求中获取头信息
     *
     * @return
     */
    public Map<String, String> getHeaderMapFromRequest() {
        Map<String, String> headerMap = new HashMap<>();
        HttpHeaders httpHeaders = fullHttpRequest.headers();
        if (!httpHeaders.isEmpty()) {
            Iterator<Map.Entry<String, String>> iterator = httpHeaders.iteratorAsString();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                headerMap.put(entry.getKey(), entry.getValue());
            }
        }
        return headerMap;
    }

    /**
     * @return
     */
    public byte[] getBodyValueFromRequest() {
        ByteBuf content = fullHttpRequest.content();
        byte[] bodyBytes = new byte[content.readableBytes()];
        content.readBytes(bodyBytes, 0, content.capacity());
        return bodyBytes;
    }

    /**
     * @return
     */
    private Map<String, String> getFormDataFromRequest() {
        Map<String, String> paramsMap = new HashMap<String, String>();
        HttpDataFactory httpDataFactory = new DefaultHttpDataFactory(false);
        HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(httpDataFactory, fullHttpRequest);
        List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
        for (InterfaceHttpData data : postData) {
            if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                MemoryAttribute attribute = (MemoryAttribute) data;
                paramsMap.put(attribute.getName(), attribute.getValue());
            }
        }
        return paramsMap;
    }

}  
 package com.gatekeeper.exchange;

public interface IRemoteExecutor {

    public void execute(HttpExchange httpExchange);

}
  
 package com.gatekeeper.exchange;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.*;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

public abstract class AbstractRemoteExecutor implements IRemoteExecutor {

    public static final String CLIENT_REAL_IP = "client-real-ip";

    /**
     * @param url
     * @param headerMap
     * @return
     */
    protected FullHttpResponse executeByGet(String url, Map<String, String> headerMap) {
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        CloseableHttpResponse response = null;
        try {
            HttpGet httpGet = new HttpGet(url);
            if (headerMap != null) {
                Set<String> keySet = headerMap.keySet();
                for (String key : keySet) {
                    httpGet.addHeader(key, headerMap.get(key));
                }
            }
            response = httpClient.execute(httpGet);
            return buildFullHttpResponse(response);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * @param url
     * @param headerMap
     * @param bodyBytes
     * @return
     */
    protected FullHttpResponse executeByPost(String url, Map<String, String> headerMap, byte[] bodyBytes) {
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        CloseableHttpResponse response = null;
        try {
            HttpPost httpPost = new HttpPost(url);
            if (headerMap != null) {
                Set<String> keySet = headerMap.keySet();
                for (String key : keySet) {
                    if (key.equalsIgnoreCase("Content-Length")) {
                        continue;
                    }
                    httpPost.addHeader(key, headerMap.get(key));
                }
            }
            httpPost.setEntity(new ByteArrayEntity(bodyBytes));
            response = httpClient.execute(httpPost);
            return buildFullHttpResponse(response);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * @param httpResponse
     * @return
     * @throws IOException
     */
    private FullHttpResponse buildFullHttpResponse(CloseableHttpResponse httpResponse) throws IOException {
        HttpEntity responseEntity = httpResponse.getEntity();
        // 响应状态
        StatusLine statusLine = httpResponse.getStatusLine();
        int statusCode = statusLine.getStatusCode();
        // 响应类型及长度
        Header contentTypeHeader = responseEntity.getContentType();
        //String contentTypeName = contentTypeHeader.getName();
        //String contentTypeValue = contentTypeHeader.getValue();
        //long contentLength = responseEntity.getContentLength();
        // 响应头
        Header[] responseHeader = httpResponse.getAllHeaders();
        HttpHeaders httpHeaders = new DefaultHttpHeaders();
        if (responseHeader != null) {
            for (int i = 0; i < responseHeader.length; i++) {
                Header header = responseHeader[i];
                httpHeaders.add(header.getName(), header.getValue());
            }
        }
        // 响应内容
        byte[] responseBytes = EntityUtils.toByteArray(responseEntity);
        // =====================================================================//
        // ===== 构造返回FullHttpResponse
        // =====================================================================//
        HttpVersion version = HttpVersion.HTTP_1_1;
        HttpResponseStatus responseStatus = HttpResponseStatus.valueOf(statusCode);
        ByteBuf content = Unpooled.copiedBuffer(responseBytes);
        HttpHeaders headers = httpHeaders;
        HttpHeaders trailingHeaders = new DefaultHttpHeaders();
        return new DefaultFullHttpResponse(version, responseStatus, content, headers, trailingHeaders);
    }


}  
 package com.gatekeeper.exchange;

import io.netty.handler.codec.http.FullHttpResponse;

import java.util.Map;

public class GetRemoteExecutor extends AbstractRemoteExecutor {

    public static final IRemoteExecutor INSTANCE = new GetRemoteExecutor();

    @Override
    public void execute(HttpExchange httpExchange) {
        String url = httpExchange.getRemoteUrl();
        Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
        headerMap.put(CLIENT_REAL_IP,httpExchange.getClientAddr());
        FullHttpResponse fullHttpResponse = super.executeByGet(url, headerMap);
        httpExchange.setFullHttpResponse(fullHttpResponse);
    }

}
  
 package com.gatekeeper.exchange;

import io.netty.handler.codec.http.FullHttpResponse;

import java.util.Map;

public class PostByteStreamExecutor extends AbstractRemoteExecutor {

    public static final IRemoteExecutor INSTANCE = new PostByteStreamExecutor();

    @Override
    public void execute(HttpExchange httpExchange) {
        String url = httpExchange.getRemoteUrl();
        Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
        headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr());
        byte[] bodyBytes = httpExchange.getBodyValueFromRequest();
        FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes);
        httpExchange.setFullHttpResponse(fullHttpResponse);
    }

}  
 package com.gatekeeper.exchange;

import io.netty.handler.codec.http.FullHttpResponse;

import java.util.Map;

public class PostStringStreamExecutor extends AbstractRemoteExecutor {

    public static final IRemoteExecutor INSTANCE = new PostStringStreamExecutor();

    @Override
    public void execute(HttpExchange httpExchange) {
        String url = httpExchange.getRemoteUrl();
        Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
        headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr());
        byte[] bodyBytes = httpExchange.getBodyValueFromRequest();
        FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes);
        httpExchange.setFullHttpResponse(fullHttpResponse);
    }

}  

文章来源:智云一二三科技

文章标题:Java,SpringBoot,Netty,HttpClient,实现代理,请求转发功能

文章地址:https://www.zhihuclub.com/187671.shtml

关于作者: 智云科技

热门文章

网站地图