如何在请求写入代理 Netty 服务器中的 outboundChannel 时在同一处理程序中获取响应 byteBuf

2023-12-25

我正在实现 netty 代理服务器,如下所示: 一个http请求进来,

  • 如果本地缓存有数据,则写入channel并flush
  • 如果没有,则从远程服务器获取数据,将其添加到缓存并刷新

我在与写入客户端的处理程序相同的处理程序中从响应中提取 byteBuf 时遇到困难。

在下面的示例中,如果您看到channelRead的方法HexDumpProxyFrontendHandler,您将看到我如何从缓存中获取并写入。我在下面的方法中添加了我遇到困难的评论

该代码端到端地工作。所以可以在本地复制和测试。

我可以看到FullHttpResponse对象在HexDumpProxyBackendhandler#channelRead。但在这个方法中,我没有对缓存的引用,也没有想要在缓存中添加的 id。

我认为有两种方法可以解决这个问题,但我不清楚如何做到这一点。

1)要么在HexdumpProxyBackendHandler中获取缓存引用和id,然后就变得容易了。但hexDumpBackendhander被实例化于channelActive of HexDumpFrontendHandler此时我还没有解析我的传入请求

2)获取响应中提取的bytebufHexdumpFrontendHandler#dchannelRead,在这种情况下它只是缓存插入。

HexDumpProxy.java

public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

HexDumpProxyInitializer.java

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
}

@Override
public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}

}

HexDumpProxyFrontendHandler.java

 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
         }
     }))
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            buf.append(cache.get(id));
            writeResponse(req, ctx, buf);
            closeOnFlush(ctx.channel());
            return;
        }
    }
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
}

/**
 * Closes the specified channel after all queued write requests are flushed.
 */
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
            }
        }
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
}

}

HexDumpProxyBackendHandler.java

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}

}

P.S:我从以下位置获取了大部分代码Netty 示例 https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy项目并定制

EDIT

根据 Ferrygig 的建议,我将 FrontEndChannelHander#channelRead 更改如下。我已经删除了channelActive并实现了写入方法

@覆盖 公共无效channelRead(最终ChannelHandlerContext ctx,对象消息){

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        buf.append(cache.get(id));
        writeResponse(req, ctx, buf);
        closeOnFlush(ctx.channel());
        return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
                }
            }));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

解决这个问题的方法有多种,并且根据您的最终目标,采取的方法也有所不同。

目前,您使用的拓扑为 1 个入站连接和 1 个出站连接,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一出站流。

目前,您的前端处理程序扩展了ChannelInboundHandlerAdapter,如果我们让它扩展,这只会拦截进入您的应用程序的“数据包”ChannelDuplexHandler https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html,我们还可以处理从应用程序发出的“数据包”。

为了接近这条路径,我们需要更新HexDumpProxyFrontendHandler要扩展的类ChannelDuplexHandler https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html(现在我们称之为CDH)。

该过程的下一步是覆盖write https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html#write-io.netty.channel.ChannelHandlerContext-java.lang.Object-io.netty.channel.ChannelPromise-方法来自于CDH https://netty.io/4.0/api/io/netty/channel/ChannelDuplexHandler.html,这样我们就可以在后端向我们发送回响应时进行拦截。

创建 write 方法后,我们需要通过调用来更新您的(非线程安全)映射put method.

public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        }
        // ...
    }
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        }
        super.write(ctx, msg, promise);
    }
    // ...
}

我们还没有完成,虽然我们已经有了代码,但我们仍然需要修复代码中其他地方的一些错误。

非线程安全映射(严重错误)

这些错误之一是您使用普通的哈希映射来处理缓存。这样做的问题是,这不是线程安全的,如果多个人同时连接到您的应用程序,可能会发生奇怪的事情,包括随着地图的内部结构更新而完全地图损坏。

为了解决这个问题,我们将把地图“升级”为ConcurrentHashMap https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html,该映射具有特殊的结构来处理同时请求和存储数据的多个线程,而不会造成巨大的性能损失。 (如果性能是主要关注点,那么您might通过使用每线程哈希映射而不是全局缓存可以获得更高的性能,但这意味着每个资源都可以缓存到线程数量。

没有缓存删除规则(主要错误)

目前,没有适当的代码来删除过时的资源,这意味着缓存将被填满,直到程序没有剩余内存,然后它将崩溃。

这可以通过使用提供线程安全访问和所谓的删除规则的映射实现来解决,或者使用已经预先制作的缓存解决方案,例如Gnuava 缓存 https://github.com/google/guava/wiki/CachesExplained.

无法正确处理 HTTP 管道(小-大错误)

HTTP 鲜为人知的功能之一是流水线 https://en.wikipedia.org/wiki/HTTP_pipelining,这基本上意味着客户端可以向服务器发送另一个请求,without等待对前一个请求的响应。这种类型的错误包括服务器交换两个请求的内容,甚至完全破坏它们。

虽然如今随着 HTTP2 支持越来越多,并且知道存在损坏的服务器,管道请求很少见,但某些使用它的 CLI 工具仍然会发生这种情况。

要解决此问题,请仅在发送上一个响应后读取请求,方法之一是保留请求列表,或者采用更高级的方法预制解决方案 https://github.com/spinscale/netty4-http-pipelining

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在请求写入代理 Netty 服务器中的 outboundChannel 时在同一处理程序中获取响应 byteBuf 的相关文章

随机推荐

  • 片段测试错误:android.view.InflateException:二进制 XML 文件行 #16:二进制 XML 文件行 #16:错误膨胀类 <未知>

    我正在尝试按照以下说明测试片段 https developer android com training basics fragments testing https developer android com training basic
  • 仅将混合模式应用于投影

    可以混合吗only元素的投影与它重叠的元素的颜色 例如 我有一个元素与另一个元素重叠 顶部的元素有一个浅灰色的阴影 下面的元素是黑色的 我不希望对任何一个元素本身应用任何混合 但希望重叠元素 的投影与下面元素的颜色混合 在阴影落在重叠元素上
  • mysql 使用子查询更新查询

    谁能看出下面的查询有什么问题吗 当我运行它时 我得到 1064 你的 SQL 语法有错误 检查与您的 MySQL 服务器版本相对应的手册 了解要使用的正确语法 第 8 行的 a where a CompetitionID Competiti
  • 通用电子邮件验证器

    我想创建一个表单 用户将在其中输入他的电子邮件 我想验证客户端的电子邮件格式 Angular 2 中有通用的电子邮件验证器吗 注意 类似于AngularJS 验证器 https docs angularjs org api ng input
  • marklogic mlcp 自定义转换拆分聚合文档为多个文件

    我有一个 JSON 聚合 文件 我想使用 mlcp 将其拆分并作为多个文档摄取到 MarkLogic 中 我想要使用 javascript 在摄取过程中转换内容 http docs marklogic com guide mlcp impo
  • 同时声明多个变量的更优雅的方式

    要 同时 声明多个变量 我会这样做 a b True False 但如果我必须声明更多的变量 它就会变得越来越不优雅 a b c d e f g h i j True True True True True False True True
  • 如何以编程方式更改 Window 注册表中的值?

    我需要以编程方式将 HKEY CURRENT USER Software Intuit QBWebConnector 中找到的 Level 字符串更改为 Verbose 做这个的最好方式是什么 C bat 文件 我以前从来没有修改过注册表
  • GCM 的注册 ID 重复

    我们有一个使用 GCM 的应用程序 当用户首次打开应用程序时 应用程序会检查共享首选项以查看用户之前是否注册过 如果没有 则从 GCM 获取注册 ID 并将其存储到共享首选项中 还有一个存储用户 ID 和注册 ID 的第 3 方服务器 我阅
  • Javascript URL 深度(级别)

    是否可以使用 Javascript 获取 url 深度 级别 如果我有这个网址 www website com site product category item gt 深度 4 www website com site product
  • Spring Security 中的 beans.NotReadablePropertyException

    我对 Spring Security 非常陌生 我捡起来了this https rads stackoverflow com amzn click com 1847199747书并尝试执行代码 当我这样做时 我得到了 org springf
  • 如何读取 Objective-C 堆栈跟踪

    我有以下堆栈跟踪 0 MyApp 0x000833a3 TFCrashHandler backtrace 26 1 MyApp 0x000836bd TFSignalHandler 28 2 libsystem c dylib 0x33ea
  • 删除重复行并更新引用

    如何删除一个表中的重复行并将另一表中的引用更新为剩余行 重复仅出现在名称中 Id 列是标识列 Example 假设我们有两张表Doubles and Data Doubles table Id int Name varchar 50 Dat
  • 如何从嵌入式 HTML 与 Swift 进行通信以更改 bool

    您好 我想在执行 html 中的 onReady 块后更改绑定变量 click 的值 我可以使用评估java脚本从swift到html进行通信 但是我如何从 html 中的 onReady 与 swift 进行通信以更改 bools val
  • Chrome 扩展中的跨源 XMLHttpRequest

    根据 chrome 扩展 API 如果设置了权限 则应允许使用 XMLHttpRequest 对象进行跨源调用 只要扩展程序首先请求跨源权限 就可以与其源之外的远程服务器进行通信 我正在密切关注谷歌教程 http code google c
  • 如何 dplyr 按列索引重命名列?

    以下代码重命名数据集中的第一列 require dplyr mtcars gt setNames c RenamedColumn names 2 length names 期望的结果 RenamedColumn cyl disp hp dr
  • jsPlumb:如何选择特定连接器

    我似乎不知道如何选择特定的 jsPlumb 连接器 我知道我可以选择与源或目标相关的所有连接器 但通常我会在同一源和目标之间有多个连接器 因此在这种情况下我看不到能够选择特定连接器的方法 我的具体用例如下 如果用户单击连接器 则会出现一个对
  • 无法重新启动 Spring 批处理作业

    我有一个 Spring Batch 作业 用于读取 转换和写入 Oracle 数据库 我通过 CommandLineJobRunner 实用程序运行该作业 使用 fat jar 使用 Maven Shade 插件生成的依赖项 该作业随后由于
  • 有没有办法在 JavaScript 中清除对象?

    有没有办法清除 Javascript 中的对象 具体来说 如果一个对象有多个成员变量 是否有一种简单的方法来重置每个值 function exampleObject this valueA A this valueB B this myAr
  • SQL——在 select 语句中分配位变量

    例如 declare bitHaveRows bit select bitHaveRows count from table where predicate 我可以在这一行调用任何函数吗 select bitHaveRows count 如
  • 如何在请求写入代理 Netty 服务器中的 outboundChannel 时在同一处理程序中获取响应 byteBuf

    我正在实现 netty 代理服务器 如下所示 一个http请求进来 如果本地缓存有数据 则写入channel并flush 如果没有 则从远程服务器获取数据 将其添加到缓存并刷新 我在与写入客户端的处理程序相同的处理程序中从响应中提取 byt