如何在请求写入代理 Netty 服务器中的 outboundChannel 时在同一处理程序中获取响应 byteBuf


我正在实现 netty 代理服务器,如下所示: 一个http请求进来,

  • 如果本地缓存有数据,则写入channel并flush
  • 如果没有,则从远程服务器获取数据,将其添加到缓存并刷新

我在与写入客户端的处理程序相同的处理程序中从响应中提取 byteBuf 时遇到困难。



我可以看到FullHttpResponse对象在HexDumpProxyBackendhandler#channelRead。但在这个方法中,我没有对缓存的引用,也没有想要在缓存中添加的 id。


1)要么在HexdumpProxyBackendHandler中获取缓存引用和id,然后就变得容易了。但hexDumpBackendhander被实例化于channelActive of HexDumpFrontendHandler此时我还没有解析我的传入请求



public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
    } finally {



public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;

public void initChannel(SocketChannel ch) {
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));



 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;

public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
            } else {
                // Close the connection if the connection attempt has failed.

public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            writeResponse(req, ctx, buf);
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                } else {

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve

public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

 * Closes the specified channel after all queued write requests are flushed.
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));

    // Write the response.

    return keepAlive;



public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;

public void channelActive(ChannelHandlerContext ctx) {

public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
            } else {

public void channelInactive(ChannelHandlerContext ctx) {

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {


P.S:我从以下位置获取了大部分代码Netty 示例 https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy项目并定制


根据 Ferrygig 的建议,我将 FrontEndChannelHander#channelRead 更改如下。我已经删除了channelActive并实现了写入方法

@覆盖 公共无效channelRead(最终ChannelHandlerContext ctx,对象消息){

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        writeResponse(req, ctx, buf);

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
            } else {
                // Close the connection if the connection attempt has failed.
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
            } else {


目前,您使用的拓扑为 1 个入站连接和 1 个出站连接,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一出站流。

目前,您的前端处理程序扩展了ChannelInboundHandlerAdapter,如果我们让它扩展,这只会拦截进入您的应用程序的“数据包”ChannelDuplexHandler https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html,我们还可以处理从应用程序发出的“数据包”。

为了接近这条路径,我们需要更新HexDumpProxyFrontendHandler要扩展的类ChannelDuplexHandler https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html(现在我们称之为CDH)。

该过程的下一步是覆盖write https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html#write-io.netty.channel.ChannelHandlerContext-java.lang.Object-io.netty.channel.ChannelPromise-方法来自于CDH https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html,这样我们就可以在后端向我们发送回响应时进行拦截。

创建 write 方法后,我们需要通过调用来更新您的(非线程安全)映射put method.

public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        // ...
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        super.write(ctx, msg, promise);
    // ...




为了解决这个问题,我们将把地图“升级”为ConcurrentHashMap https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html,该映射具有特殊的结构来处理同时请求和存储数据的多个线程,而不会造成巨大的性能损失。 (如果性能是主要关注点,那么您might通过使用每线程哈希映射而不是全局缓存可以获得更高的性能,但这意味着每个资源都可以缓存到线程数量。



这可以通过使用提供线程安全访问和所谓的删除规则的映射实现来解决,或者使用已经预先制作的缓存解决方案,例如Gnuava 缓存 https://github.com/google/guava/wiki/CachesExplained.

无法正确处理 HTTP 管道(小-大错误)

HTTP 鲜为人知的功能之一是流水线 https://en.wikipedia.org/wiki/HTTP_pipelining,这基本上意味着客户端可以向服务器发送另一个请求,without等待对前一个请求的响应。这种类型的错误包括服务器交换两个请求的内容,甚至完全破坏它们。

虽然如今随着 HTTP2 支持越来越多,并且知道存在损坏的服务器,管道请求很少见,但某些使用它的 CLI 工具仍然会发生这种情况。

要解决此问题,请仅在发送上一个响应后读取请求,方法之一是保留请求列表,或者采用更高级的方法预制解决方案 https://github.com/spinscale/netty4-http-pipelining


