Netty异步写入响应和大小未知的大数据

2024-01-06

我开发了一个netty http服务器,但是当我在方法ChannelInboundHandlerAdapter.channelRead0中写入响应时,我的响应结果来自另一台服务器,并且结果的大小未知,因此它的http响应标头可能具有内容长度或分块。所以我使用一个缓冲区,如果它足够(读取完整数据),无论内容长度或分块,我使用内容长度,否则我使用分块。

  1. 我如何保存第一个连接的写入通道,然后将其传递给第二个处理程序以写入响应。 (我只是直接传递ctx来写入但没有任何返回)

  2. 我如何有条件地决定将分块数据写入通道或具有内容长度的普通数据(如果在channelRead0.1时需要分块,则添加ChunkWriteHandler似乎不起作用。

以一个简单的代码为例:

```java

EventLoopGroup bossGroup = new NioEventLoopGroup();
    final EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<Channel>(){

                @Override
                protected void initChannel(Channel ch) throws Exception
                {
                    System.out.println("Start, I accept client");
                    ChannelPipeline pipeline = ch.pipeline();

                    // Uncomment the following line if you want HTTPS
                    // SSLEngine engine =
                    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
                    // engine.setUseClientMode(false);
                    // pipeline.addLast("ssl", new SslHandler(engine));

                    pipeline.addLast("decoder", new HttpRequestDecoder());
                    // Uncomment the following line if you don't want to handle HttpChunks.
                    // pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("encoder", new HttpResponseEncoder());
                    // Remove the following line if you don't want automatic content
                    // compression.
                    //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
                    pipeline.addLast("deflater", new HttpContentCompressor());
                    pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                            {
                                System.out.println("msg=" + msg);

                                final ChannelHandlerContext ctxClient2Me = ctx;

                                // TODO: Implement this method
                                Bootstrap bs = new Bootstrap();
                                try{
                                //bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class,  DefaultDnsServerAddressStreamProvider.INSTANCE));
                                //.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
                                bs.resolver(DefaultAddressResolverGroup.INSTANCE);
                                }catch(Exception e){
                                    e.printStackTrace();
                                }

                                bs.channel(NioSocketChannel.class);
                                EventLoopGroup cg = workerGroup;//new NioEventLoopGroup();
                                bs.group(cg).handler(new ChannelInitializer<Channel>(){

                                        @Override
                                        protected void initChannel(Channel ch) throws Exception
                                        {
                                            System.out.println("start, server accept me");
                                            // TODO: Implement this method
                                            ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
                                            ch.pipeline().addLast(new HttpResponseDecoder());
                                            ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){

                                                    @Override
                                                    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                                                    {
                                                        // TODO: Implement this method
                                                        System.out.println("target = " + msg);
                                                        //
                                                        if(msg instanceof HttpResponse){
                                                            HttpResponse res = (HttpResponse) msg;
                                                            HttpUtil.isTransferEncodingChunked(res);
                                                            DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());

                                                            //resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                                            //resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");

                                                            ctxClient2Me.write(resClient2Me);
                                                        }
                                                        if(msg instanceof LastHttpContent){
                                                            // now response the request of the client, it wastes x seconds from receiving request to response
                                                            ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
                                                            ctx.close();
                                                        }else if( msg instanceof HttpContent){
                                                            //ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
                                                        }
                                                    }


                                                });

                                            System.out.println("end, server accept me");

                                        }

                                });

                                final URI uri = new URI("http://example.com/");
                                String host = uri.getHost();
                                ChannelFuture connectFuture= bs.connect(host, 80);

                                System.out.println("to connect me to server");

                                connectFuture.addListener(new ChannelFutureListener(){

                                        @Override
                                        public void operationComplete(ChannelFuture cf) throws Exception
                                        {
                                        }

                                });


                                ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io 
                                System.out.println("connected me to server");

                                DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
                                //req.headers().set(HttpHeaderNames.HOST, "");
                                connetedFuture.channel().writeAndFlush(req);

                                System.out.println("end of Client2Me channelRead0");
                                System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
                            }

                    });
                    System.out.println("end, I accept client");
                }

            });

            System.out.println("========");

        ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

```


经过一番尝试从非 Netty 事件循环线程发送响应后,我终于找到了问题所在。如果您的客户端使用关闭输出流

socketChannel.shutdownOutput()

那么你需要设置ALLOW_HALF_CLOSURENetty 中的属性为 true,因此它不会关闭通道。 这是一个示例服务器。客户留给读者作为练习:-)

    final ServerBootstrap b = new ServerBootstrap();

    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
                @Override
                protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                            ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // This is important
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
                            String id = ctx.channel().id().asLongText();

                            // When Done reading all the bytes, send response 1 second later
                            timer.schedule(new TimerTask() {
                                @Override
                                public void run() {
                                    ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
                                    ctx.flush();
                                    ctx.close();

                                    log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
                                    startTimes.remove(id);
                                }
                            }, 1000);
                        }
                    }
                }
            });
    Channel ch = b.bind("localhost", PORT).sync().channel();
    ch.closeFuture().sync();

当然,正如线程中其他人提到的,您不能发送字符串,您需要使用发送 ByteBufUnpooled.copiedBuffer

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty异步写入响应和大小未知的大数据 的相关文章

随机推荐