背景说明
开发环境:idea + SpringBoot2.3.3.RELEASE + netty4.1.60.Final + httpclient4.5
实现目标:

实现代码:
<!-- netty -all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.60.Final</version>
</dependency>
<!-- protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.15.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1</version>
</dependency>
application.yml
gatekeeper:
server:
local-addr: 0.0.0.0
local-port: 80
proxy:
remote-addr: www.baidu.com
remote-port: 80
# 日志输出
logging:
level:
root: info
com:
gatekeeper: debug
pattern:
console: '%clr(%d{yyyy-MM-dd} [%thread] %-5level %logger{50} - %msg%n)'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} >>> [%thread] >>> %-5level >>> %logger{50} >>> %msg%n'
配置:
package com.gatekeeper.configure;
import com.gatekeeper.common. applicationContext Helper;
import com.gatekeeper.server.GatekeeperHttpConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.context. annotation .Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class GatekeeperConfigure {
@Bean(name = "gatekeeperHttpConfig")
public GatekeeperHttpConfig gatekeeperHttpConfig() {
return new GatekeeperHttpConfig();
}
@Bean(name = "applicationContextHelper")
public ApplicationContextHelper applicationContextHelper(ApplicationContext applicationContext) {
return new ApplicationContextHelper(applicationContext);
}
}
package com.gatekeeper.common;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util. Map ;
public class ApplicationContextHelper implements ApplicationContextAware {
private static ApplicationContext applicationContext;
public ApplicationContextHelper(ApplicationContext applicationContext) {
this.setApplicationContext(applicationContext);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ApplicationContextHelper.applicationContext = applicationContext;
}
/**
* 根据Bean的名称获取bean对象
*
* @param beanName
* @param <T>
* @return
*/
public static <T> T getBean(String beanName) {
if (applicationContext.containsBean(beanName)) {
return (T) applicationContext.getBean(beanName);
} else {
return null;
}
}
public static <T> Map<String, T> getBeansOfType(Class<T> baseType) {
return applicationContext.getBeansOfType(baseType);
}
}
package com.gatekeeper.server;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
@Data
public class GatekeeperHttpConfig {
@Value("${gatekeeper.server.local-addr}")
private String localAddr;
@Value("${gatekeeper.server.local-port}")
private int localPort;
@Value("${gatekeeper.proxy.remote-addr}")
private String remoteAddr;
@Value("${gatekeeper.proxy.remote-port}")
private int remotePort;
}
服务类:
package com.gatekeeper.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty. channel .ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern. slf4j .Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@Component
@Slf4j
@Order(value = 1)
public class GatekeeperHttpServer implements ApplicationRunner, DisposableBean {
@Autowired
private GatekeeperHttpConfig gatekeeperHttpConfig;
private Thread serverThread;
private EventLoopGroup bossEventGroup;
private NioEventLoopGroup workerEventGroup;
@Override
public void run(ApplicationArguments args) throws Exception {
log.debug("run", args);
serverThread = new Thread(() -> {
try {
// 初始化==>用于Acceptor的主"线程池"
this.bossEventGroup = new NioEventLoopGroup();
// 初始化==>用于I/O工作的从"线程池"
this.workerEventGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
// group方法设置主从线程池
serverBootstrap.group(bossEventGroup, workerEventGroup);
// 指定通道channel类型,服务端为:NioServerSocketChannel
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new GatekeeperHttpServerInitializer());
SocketAddress serverAddr = new InetSocketAddress(gatekeeperHttpConfig.getLocalAddr(), gatekeeperHttpConfig.getLocalPort());
// 绑定并侦听端口
ChannelFuture channelFuture = serverBootstrap.bind(serverAddr).sync();
log.debug("server:" + serverAddr + " start");
// 等待服务监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放"线程池"
bossEventGroup.shutdownGracefully();
workerEventGroup.shutdownGracefully();
}
});
serverThread.setName("gatekeeper-server-thread");
serverThread.setDaemon(true);
serverThread.start();
}
@Override
public void destroy() throws Exception {
if (bossEventGroup != null) {
bossEventGroup.shutdownGracefully();
}
if (workerEventGroup != null) {
workerEventGroup.shutdownGracefully();
}
}
}
package com.gatekeeper.server;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.util.Map;
/**
* netty 实现简单的 http 协议:配置 解码器、handler
*/
public class GatekeeperHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// http编解码
pipeline.addLast("http-decoder", new HttpRequestDecoder());
// http消息聚合器
pipeline.addLast("http-aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-encoder", new HttpResponseEncoder());
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
// 请求处理器
pipeline.addLast("MyNettyHttpServerHandler", new GatekeeperHttpRequestHandler());
// 打印
Map<String, ChannelHandler> handlerMap = pipeline.toMap();
for (String key : handlerMap.keySet()) {
// System.out.println(key + "=" + handlerMap.get(key));
}
}
}
package com.gatekeeper.server;
import com.gatekeeper.common.ApplicationContextHelper;
import com.gatekeeper. exchange .GatekeeperExchanger;
import com.gatekeeper.exchange.HttpExchange;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
@Slf4j
public class GatekeeperHttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private GatekeeperHttpConfig catekeeperHttpConfig;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
log.debug("channelRead0() fullHttpRequest-->" + fullHttpRequest);
SocketAddress clientSocketAddress = channelHandlerContext.channel().remoteAddress();
HttpResponseStatus responseStatus = HttpResponseStatus.OK;
FullHttpResponse fullHttpResponse = null;
if (fullHttpRequest.method() == HttpMethod.GET) {
HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest);
httpExchange.setClientSocketAddress(clientSocketAddress);
fullHttpResponse = GatekeeperExchanger.exchange(httpExchange);
} else if (fullHttpRequest.method() == HttpMethod.POST) {
HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest);
httpExchange.setClientSocketAddress(clientSocketAddress);
fullHttpResponse = GatekeeperExchanger.exchange(httpExchange);
} else {
responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
}
if (fullHttpResponse != null) {
log.debug("channelRead0() fullHttpResponse-->" + fullHttpResponse);
channelHandlerContext.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
}
}
/**
* @return
*/
public GatekeeperHttpConfig getCatekeeperHttpConfig() {
if (catekeeperHttpConfig == null) {
synchronized (this) {
catekeeperHttpConfig = ApplicationContextHelper.getBean("gatekeeperHttpConfig");
}
}
return catekeeperHttpConfig;
}
}
信息交换:
package com.gatekeeper.exchange;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.util.StringUtils;
public final class GatekeeperExchanger {
/**
* @param httpExchange
* @return
*/
public static FullHttpResponse exchange(HttpExchange httpExchange) {
HttpMethod httpMethod = httpExchange.getFullHttpRequest().method();
String strContentType = getRequestContentType(httpExchange);
if (HttpMethod.GET == httpMethod) {
GetRemoteExecutor.INSTANCE.execute(httpExchange);
} else if (HttpMethod.POST == httpMethod) {
if (isRequestBodyStringStream(strContentType, httpExchange)) {
PostStringStreamExecutor.INSTANCE.execute(httpExchange);
} else if (isRequestBodyFormData(strContentType, httpExchange)) {
Post byte StreamExecutor.INSTANCE.execute(httpExchange);
} else {
PostStringStreamExecutor.INSTANCE.execute(httpExchange);
}
}
return httpExchange.getFullHttpResponse();
}
/**
* @param strContentType
* @param httpExchange
* @return
*/
private static boolean isRequestBodyFormData(String strContentType, HttpExchange httpExchange) {
if ("multipart/form-data".equalsIgnoreCase(strContentType)) {
// 上传
return true;
}
return false;
}
/**
* @param strContentType
* @param httpExchange
* @return
*/
private static boolean isRequestBodyStringStream(String strContentType, HttpExchange httpExchange) {
if ("application/x-www-form-urlencoded".equalsIgnoreCase(strContentType)) {
// name1=value1&name2=value2
return true;
} else if ("application/json".equalsIgnoreCase(strContentType)) {
// {name1:value1,name2:value2}
return true;
} else if ("text/xml".equalsIgnoreCase(strContentType)) {
// <xml><xml>
return true;
} else if ("application/xml".equalsIgnoreCase(strContentType)) {
// <xml><xml>
return true;
}
return false;
}
/**
* @param httpExchange
* @return
*/
private static String getRequestContentType(HttpExchange httpExchange) {
String strContentType = httpExchange.getHeaderValueFromRequest("Content-type");
if (!StringUtils.isEmpty(strContentType)) {
strContentType = strContentType.trim();
}
return strContentType;
}
}
package com.gatekeeper.exchange;
import com.gatekeeper.server.GatekeeperHttpConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Data
@Slf4j
public final class HttpExchange {
private GatekeeperHttpConfig catekeeperHttpConfig;
private ChannelHandlerContext channelHandlerContext;
private FullHttpRequest fullHttpRequest;
private FullHttpResponse fullHttpResponse;
private SocketAddress clientSocketAddress;
public HttpExchange(GatekeeperHttpConfig catekeeperHttpConfig, ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
this.catekeeperHttpConfig = catekeeperHttpConfig;
this.channelHandlerContext = channelHandlerContext;
this.fullHttpRequest = fullHttpRequest;
}
/**
* @param clientSocketAddress
*/
public void setClientSocketAddress(SocketAddress clientSocketAddress) {
this.clientSocketAddress = clientSocketAddress;
}
/**
* @return
*/
public SocketAddress getClientSocketAddress() {
return this.clientSocketAddress;
}
/**
* @return
*/
public String getClientAddr() {
return this.clientSocketAddress.toString();
}
/**
* 远程执行地址
*
* @return
*/
public String getRemoteUrl() {
String url = "#34; + catekeeperHttpConfig.getRemoteAddr() + ":" + catekeeperHttpConfig.getRemotePort();
url = url + fullHttpRequest.uri();
log.debug("remote url " + url);
return url;
}
/**
* 从URI中获取请求参数
*
* @return
*/
public Map<String, List<String>> getUriParamMapFromRequest() {
Map<String, List<String>> params = new HashMap<>();
QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
Map<String, List<String>> paramList = decoder.parameters();
for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
params.put(entry.getKey(), entry.getValue());
}
return params;
}
/**
* 从请求中获取头信息
*
* @param name
* @return
*/
public String getHeaderValueFromRequest(String name) {
HttpHeaders httpHeaders = fullHttpRequest.headers();
if (!httpHeaders.isEmpty()) {
return httpHeaders.get(name);
}
return null;
}
/**
* 从请求中获取头信息
*
* @return
*/
public Map<String, String> getHeaderMapFromRequest() {
Map<String, String> headerMap = new HashMap<>();
HttpHeaders httpHeaders = fullHttpRequest.headers();
if (!httpHeaders.isEmpty()) {
Iterator<Map.Entry<String, String>> iterator = httpHeaders.iteratorAsString();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
headerMap.put(entry.getKey(), entry.getValue());
}
}
return headerMap;
}
/**
* @return
*/
public byte[] getBodyValueFromRequest() {
ByteBuf content = fullHttpRequest.content();
byte[] bodyBytes = new byte[content.readableBytes()];
content.readBytes(bodyBytes, 0, content.capacity());
return bodyBytes;
}
/**
* @return
*/
private Map<String, String> getFormDataFromRequest() {
Map<String, String> paramsMap = new HashMap<String, String>();
HttpDataFactory httpDataFactory = new DefaultHttpDataFactory(false);
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(httpDataFactory, fullHttpRequest);
List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
for (InterfaceHttpData data : postData) {
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
paramsMap.put(attribute.getName(), attribute.getValue());
}
}
return paramsMap;
}
}
package com.gatekeeper.exchange;
public interface IRemoteExecutor {
public void execute(HttpExchange httpExchange);
}
package com.gatekeeper.exchange;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.*;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
public abstract class AbstractRemoteExecutor implements IRemoteExecutor {
public static final String CLIENT_REAL_IP = "client-real-ip";
/**
* @param url
* @param headerMap
* @return
*/
protected FullHttpResponse executeByGet(String url, Map<String, String> headerMap) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
CloseableHttpResponse response = null;
try {
HttpGet httpGet = new HttpGet(url);
if (headerMap != null) {
Set<String> keySet = headerMap.keySet();
for (String key : keySet) {
httpGet.addHeader(key, headerMap.get(key));
}
}
response = httpClient.execute(httpGet);
return buildFullHttpResponse(response);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* @param url
* @param headerMap
* @param bodyBytes
* @return
*/
protected FullHttpResponse executeByPost(String url, Map<String, String> headerMap, byte[] bodyBytes) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
CloseableHttpResponse response = null;
try {
HttpPost httpPost = new HttpPost(url);
if (headerMap != null) {
Set<String> keySet = headerMap.keySet();
for (String key : keySet) {
if (key.equalsIgnoreCase("Content-Length")) {
continue;
}
httpPost.addHeader(key, headerMap.get(key));
}
}
httpPost.setEntity(new ByteArrayEntity(bodyBytes));
response = httpClient.execute(httpPost);
return buildFullHttpResponse(response);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* @param httpResponse
* @return
* @throws IOException
*/
private FullHttpResponse buildFullHttpResponse(CloseableHttpResponse httpResponse) throws IOException {
HttpEntity responseEntity = httpResponse.getEntity();
// 响应状态
StatusLine statusLine = httpResponse.getStatusLine();
int statusCode = statusLine.getStatusCode();
// 响应类型及长度
Header contentTypeHeader = responseEntity.getContentType();
//String contentTypeName = contentTypeHeader.getName();
//String contentTypeValue = contentTypeHeader.getValue();
//long contentLength = responseEntity.getContentLength();
// 响应头
Header[] responseHeader = httpResponse.getAllHeaders();
HttpHeaders httpHeaders = new DefaultHttpHeaders();
if (responseHeader != null) {
for (int i = 0; i < responseHeader.length; i++) {
Header header = responseHeader[i];
httpHeaders.add(header.getName(), header.getValue());
}
}
// 响应内容
byte[] responseBytes = EntityUtils.toByteArray(responseEntity);
// =====================================================================//
// ===== 构造返回FullHttpResponse
// =====================================================================//
HttpVersion version = HttpVersion.HTTP_1_1;
HttpResponseStatus responseStatus = HttpResponseStatus.valueOf(statusCode);
ByteBuf content = Unpooled.copiedBuffer(responseBytes);
HttpHeaders headers = httpHeaders;
HttpHeaders trailingHeaders = new DefaultHttpHeaders();
return new DefaultFullHttpResponse(version, responseStatus, content, headers, trailingHeaders);
}
}
package com.gatekeeper.exchange;
import io.netty.handler.codec.http.FullHttpResponse;
import java.util.Map;
public class GetRemoteExecutor extends AbstractRemoteExecutor {
public static final IRemoteExecutor INSTANCE = new GetRemoteExecutor();
@Override
public void execute(HttpExchange httpExchange) {
String url = httpExchange.getRemoteUrl();
Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
headerMap.put(CLIENT_REAL_IP,httpExchange.getClientAddr());
FullHttpResponse fullHttpResponse = super.executeByGet(url, headerMap);
httpExchange.setFullHttpResponse(fullHttpResponse);
}
}
package com.gatekeeper.exchange;
import io.netty.handler.codec.http.FullHttpResponse;
import java.util.Map;
public class PostByteStreamExecutor extends AbstractRemoteExecutor {
public static final IRemoteExecutor INSTANCE = new PostByteStreamExecutor();
@Override
public void execute(HttpExchange httpExchange) {
String url = httpExchange.getRemoteUrl();
Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr());
byte[] bodyBytes = httpExchange.getBodyValueFromRequest();
FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes);
httpExchange.setFullHttpResponse(fullHttpResponse);
}
}
package com.gatekeeper.exchange;
import io.netty.handler.codec.http.FullHttpResponse;
import java.util.Map;
public class PostStringStreamExecutor extends AbstractRemoteExecutor {
public static final IRemoteExecutor INSTANCE = new PostStringStreamExecutor();
@Override
public void execute(HttpExchange httpExchange) {
String url = httpExchange.getRemoteUrl();
Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest();
headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr());
byte[] bodyBytes = httpExchange.getBodyValueFromRequest();
FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes);
httpExchange.setFullHttpResponse(fullHttpResponse);
}
}