Netty源码分析 (八)----- write过程 源码分析

2023-05-16

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeAndFlush。

主要内容

本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

  1. pipeline中的标准链表结构
  2. java对象编码过程
  3. write:写队列
  4. flush:刷新写队列
  5. writeAndFlush: 写队列并刷新

pipeline中的标准链表结构

一个标准的pipeline链式结构如下

数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去。而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到jdk底层管道

java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?

我们先看下调用write的code

BusinessHandler


protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
    Response response = doBusiness(request);

    if (response != null) {
        ctx.channel().write(response);
    }
}  

业务处理器接受到请求之后,做一些业务处理,返回一个Response,然后,response在pipeline中传递,落到 Encoder节点,我们来跟踪一下 ctx.channel().write(response);


public ChannelFuture write(Object msg) {
    return this.pipeline.write(msg);
}  

调用了Channel中的pipeline中的write方法,我们接着看


public final ChannelFuture write(Object msg) {
    return this.tail.write(msg);
}  

pipeline中有属性tail,调用tail中的write,由此我们知道write消息的时候,从tail开始,接着往下看


private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = this.findContextOutbound();
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        Object task;
        if (flush) {
            task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
        } else {
            task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
        }

        safeExecute(executor, (Runnable)task, promise, m);
    }

}  

中间我省略了几个重载的方法,我们来看看第一行代码,next = this.findContextOutbound();


private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;

    do {
        ctx = ctx.prev;
    } while(!ctx.outbound);

    return ctx;
}  

通过 ctx = ctx.prev; 我们知道从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokeWrite(m, promise),此时找到的第一个outbound的handler就是我们自定义的编码器Encoder

我们接着看 next.invokeWrite(m, promise);


private void invokeWrite(Object msg, ChannelPromise promise) {
    if (this.invokeHandler()) {
        this.invokeWrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }

}
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
    } catch (Throwable var4) {
        notifyOutboundHandlerException(var4, promise);
    }

}  

一路代码跟下来,我们可以知道是调用了第一个outBound类型的handler中的write方法,也就是第一个调用的是我们自定义编码器Encoder的write方法

我们来看看自定义Encoder


public class Encoder extends MessageToByteEncoder<Response> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
        out.writeByte(response.getVersion());
        out.writeInt(4 + response.getData().length);
        out.writeBytes(response.getData());
    }
}  

自定义Encoder继承 MessageToByteEncoder ,并且重写了 encode方法,这就是编码器的核心,我们先来看 MessageToByteEncoder


public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {  

我们看到 MessageToByteEncoder 继承了 ChannelOutboundHandlerAdapter,说明了 Encoder 是一个 Outbound的handler

我们来看看 Encoder 的父类 MessageToByteEncoder中的write方法

MessageToByteEncoder


@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 判断当前Handelr是否能处理写入的消息
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // 强制换换
            I cast = (I) msg;
            // 分配一段ButeBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
            // 调用encode,这里就调回到  `Encoder` 这个Handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉
                // (当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了)
                ReferenceCountUtil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
            // 否则,释放buf,将空数据传到下一个节点    
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}  

这里,我们详细阐述一下Encoder是如何处理传入的java对象的

1.判断当前Handler是否能处理写入的消息,如果能处理,进入下面的流程,否则,直接扔给下一个节点处理
2.将对象强制转换成Encoder可以处理的 Response对象
3.分配一个ByteBuf
4.调用encoder,即进入到 Encoderencode方法,该方法是用户代码,用户将数据写入ByteBuf
5.既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉,(当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了)
6.如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
7.最后,当buf在pipeline中处理完之后,释放节点

总结一点就是,Encoder节点分配一个ByteBuf,调用encode方法,将java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点,因为head节点是一个OutBount类型的handler

HeadContext


public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}  

这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象,进入下一节

write:写队列

我们来看看channel中unsafe的write方法,先来看看其中的一个属性

AbstractUnsafe


protected abstract class AbstractUnsafe implements Unsafe {
    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);  

我们来看看 ChannelOutboundBuffer 这个类


public final class ChannelOutboundBuffer {
    private final Channel channel;
    private ChannelOutboundBuffer.Entry flushedEntry;
    private ChannelOutboundBuffer.Entry unflushedEntry;
    private ChannelOutboundBuffer.Entry tailEntry;  

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。其中的属性我们下面会详细讲

我们回到正题,接着看 unsafe.write(msg, promise);

AbstractUnsafe


@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}  

1.调用 filterOutboundMessage() 方法,将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer


@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }

        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}  

2.接下来,估算出需要写入的ByteBuf的size
3.最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

ChannelOutboundBuffer


public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 创建一个待写出的消息节点
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(size, false);
}  

想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图

ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 EntryEntry 里面包含了待写出ByteBuf 以及消息回调 promise,下面分别是三个指针的作用

1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点

初次调用 addMessage 之后,各个指针的情况为

fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的节点

第二次调用 addMessage之后,各个指针的情况为

第n次调用 addMessage之后,各个指针的情况为

可以看到,调用n次addMessage,flushedEntry指针一直指向NULL,表示现在还未有节点需要写出到Socket缓冲区,而unFushedEntry之后有n个节点,表示当前还有n个节点尚未写出到Socket缓冲区中去

flush:刷新写队列

不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

HeadContext


@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}  

之后进入到AbstractUnsafe

AbstractUnsafe


public final void flush() {
   assertEventLoop();

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
   if (outboundBuffer == null) {
       return;
   }

   outboundBuffer.addFlush();
   flush0();
}  

flush方法中,先调用 outboundBuffer.addFlush();

ChannelOutboundBuffer


public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}  

可以结合前面的图来看,首先拿到 unflushedEntry 指针,然后将 flushedEntry 指向unflushedEntry所指向的节点,调用完毕之后,三个指针的情况如下所示

 

相当于所有的节点都即将开始推送出去

接下来,调用 flush0();

AbstractUnsafe


protected void flush0() {
    doWrite(outboundBuffer);
}  

发现这里的核心代码就一个 doWrite,继续跟

AbstractNioByteChannel


protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            // 强转为ByteBuf,若发现没有数据可读,直接删除该节点
            ByteBuf buf = (ByteBuf) msg;

            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}  

这里略微有点复杂,我们分析一下

1.第一步,调用current()先拿到第一个需要flush的节点的数据

 ChannelOutBoundBuffer


public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}  

2.第二步,拿到自旋锁的迭代次数


if (writeSpinCount == -1) {
    writeSpinCount = config().getWriteSpinCount();
}  

3.自旋的方式将ByteBuf写出到jdk nio的Channel


for (int i = writeSpinCount - 1; i >= 0; i --) {
    int localFlushedAmount = doWriteBytes(buf);
    if (localFlushedAmount == 0) {
        setOpWrite = true;
        break;
    }

    flushedAmount += localFlushedAmount;
    if (!buf.isReadable()) {
        done = true;
        break;
    }
}  

doWriteBytes 方法跟进去


protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}  

我们发现,出现了 javaChannel(),表明已经进入到了jdk nio Channel的领域,我们来看看 buf.readBytes(javaChannel(), expectedWrittenBytes);


public int readBytes(GatheringByteChannel out, int length) throws IOException {
    this.checkReadableBytes(length);
    int readBytes = this.getBytes(this.readerIndex, out, length);
    this.readerIndex += readBytes;
    return readBytes;
}  

我们来看关键代码 this.getBytes(this.readerIndex, out, length)


private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
    this.checkIndex(index, length);
    if (length == 0) {
        return 0;
    } else {
        ByteBuffer tmpBuf;
        if (internal) {
            tmpBuf = this.internalNioBuffer();
        } else {
            tmpBuf = ((ByteBuffer)this.memory).duplicate();
        }

        index = this.idx(index);
        tmpBuf.clear().position(index).limit(index + length);
        //将tmpBuf中的数据写到out中
        return out.write(tmpBuf);
    }
}  

我们来看看out.write(tmpBuf)


public int write(ByteBuffer src) throws IOException {
    ensureOpen();
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.write(fd, src, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}  

和read实现一样,SocketChannelImpl的write方法通过IOUtil的write实现:关键代码 n = IOUtil.write(fd, src, -1, nd);


static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    //如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
    if (var1 instanceof DirectBuffer) {
        return writeFromNativeBuffer(var0, var1, var2, var4);
    } else {
        //非DirectBuffer
        //获取已经读取到的位置
        int var5 = var1.position();
        //获取可以读到的位置
        int var6 = var1.limit();

        assert var5 <= var6;
        //申请一个原buffer可读大小的DirectByteBuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

        int var10;
        try {

            var8.put(var1);
            var8.flip();
            var1.position(var5);
            //通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
            int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            var10 = var9;
        } finally {
            //回收分配的DirectByteBuffer
            Util.offerFirstTemporaryDirectBuffer(var8);
        }

        return var10;
    }
}  

代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filterOutboundMessage(),将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

说明到了这一步所有的 var1 意境是直接内存DirectBuffer,就不需要走到else,就不需要write两次了

4.删除该节点

节点的数据已经写入完毕,接下来就需要删除该节点

ChannelOutBoundBuffer


public boolean remove() {
    Entry e = flushedEntry;
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}  

首先拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()方法移除该节点


private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}  

这里的remove是逻辑移除,只是将flushedEntry指针移到下个节点,调用完毕之后,节点图示如下

writeAndFlush: 写队列并刷新

理解了write和flush这两个过程,writeAndFlush 也就不难了


public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } 
}  

可以看到,最终,通过一个boolean变量,表示是调用 invokeWriteAndFlush,还是 invokeWriteinvokeWrite便是我们上文中的write过程


private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    invokeWrite0(msg, promise);
    invokeFlush0();
}  

可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的


private void invokeWrite(Object msg, ChannelPromise promise) {
        invokeWrite0(msg, promise);
}

private void invokeFlush(Object msg, ChannelPromise promise) {
        invokeFlush0(msg, promise);
}  

由此看来,invokeWriteAndFlush基本等价于write方法之后再来一次flush

总结

1.pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递
2.调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
3.writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
4.netty中的缓冲区中的ByteBuf为DirectByteBuf

 

 

 

转载于:https://www.cnblogs.com/java-chen-hao/p/11477385.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty源码分析 (八)----- write过程 源码分析 的相关文章

  • netty源码:(24)EventExecutorChooserFactory类

    该类定义了一个内部接口EventExecutorChooser 该接口有一个方法 next EventExecutorChooserFactory类有个默认实现类 DefaultEventExecutorChooserFactory 该类有
  • netty源码:(24)EventExecutorChooserFactory类

    该类定义了一个内部接口EventExecutorChooser 该接口有一个方法 next EventExecutorChooserFactory类有个默认实现类 DefaultEventExecutorChooserFactory 该类有
  • 在 netty 通道上设置套接字超时

    我有一个 netty 通道 我想在底层套接字上设置超时 默认设置为 0 超时的目的是 如果 15 分钟内没有发生任何事情 则未使用的通道将被关闭 虽然我没有看到任何配置可以这样做 而且套接字本身也对我隐藏 Thanks 如果使用ReadTi
  • 使用 Netty 的多线程 UDP 服务器

    我正在尝试使用 Netty 实现 UDP 服务器 这个想法是只绑定一次 因此只创建一个Channel This Channel仅使用一个处理程序进行初始化 该处理程序通过一个线程在多个线程之间分派传入数据报的处理ExecutorServic
  • Java NIO 窗口实现

    在使用 NIO 2 AIO 功能进行项目时 我查看了 旧的 NIO 选择器实现 发现在 Windows 上使用了默认的选择函数 由于内部实现不良 该函数在 Windows 上根本无法扩展 大家都知道 在 Windows 上 IOCP 是唯一
  • Netty的并发编码

    编码器的encode方法会并发执行吗 我观察到编码方法可能是由不同线程并发的 管道定义为 Channels pipeline idleHandler new AmfDecoder
  • 分析 Netty 性能

    我正在编写一个 Netty 应用程序 该应用程序在 64 位八核 Linux 机器上运行 Netty 应用程序是一个简单的路由器 它接受请求 传入管道 从请求中读取一些元数据并将数据转发到远程服务 传出管道 该远程服务将向传出管道返回一个或
  • Netty Channel.write 线程安全吗?

    我有一个 Netty 应用程序 我希望有多个线程写入通道 我只是想知道 Channel write 是否线程安全 从代码中可以看出 ChannelOutboundBuffer addMessage 方法本身不是线程安全的 然而 写入通道是
  • 泄漏:ByteBuf.release() 在被垃圾收集之前没有被调用。 Spring Reactor TcpServer

    我正在使用reactor core 1 1 0 RELEASE reactor net 1 1 0 RELEASE 正在使用netty all 4 0 18 Final reactor spring context 1 1 0 RELEAS
  • netty中非阻塞通道中的SO_TIMEOUT

    如果通道在超时毫秒内未收到读取 响应 SO TIMEOUT 是否会使非阻塞通道过期 bootstrap group workerGroup channel NioSocketChannel class handler channelInit
  • 使用 Netty 的异步 HTTP 客户端

    我是 Netty 新手 仍在努力寻找自己的方法 我正在寻找创建一个异步工作的 http 客户端 http的netty例子只展示了如何等待IO操作 并没有展示如何使用添加监听器 所以最近几天我一直在努力解决这个问题 我正在尝试创建一个请求类
  • 如何知道Netty ByteBuf中是否没有数据可读取?

    我是 Netty 新手 文件传输的问题让我困惑了好几天 我想发送image文件从客户端到服务器 下面的代码是可执行的 但只有我shutdown服务器强制我可以正常打开收到的图像文件 否则 显示 您似乎没有查看此文件的权限 检查权限并重试 所
  • 如何阻止netty在服务器套接字上监听和接受

    有没有办法告诉 netty 停止侦听和接受套接字上的新连接 但完成当前连接上任何正在进行的工作 你可以直接关闭ServerSocketChannel由创建的ChannelFactory 通常 ServerSocketChannel由返回Se
  • 为什么JDK NIO使用这么多anon_inode文件描述符?

    我正在使用 Sun 的 JDK 1 6 0 26 和 NIO 带有 Netty 在 lsof 中我看到数百个文件描述符anon inode lsof np 11225 fgrep w anon inode java 11225 nobody
  • Netty:如何处理从 ChunkedFile 接收到的块

    我是 netty 新手 我正在尝试将分块文件从服务器传输到客户端 发送块工作得很好 问题在于如何处理接收到的块并将它们写入文件 我尝试的两种方法都会给我带来直接缓冲区错误 任何帮助将不胜感激 Thanks Override protecte
  • Netty 和字节顺序

    由于文档不完善并且缺乏 Netty 经验 我遇到了一些问题 我不知道如何设置默认的 ByteOrder 我需要一个小尾数法默认设置 如果有人能给我一些关于这方面的提示 我会很高兴 你可以使用Bootstrap setOption 去做这个
  • 如何关闭 Netty 库调试输出?

    我正在使用 Netty 通过 Ning async HTTPlibrary http www ning com code 2010 03 introducing nings asynchronous http client library
  • 不同的 Netty 版本及其用途

    我现在使用Netty有一段时间了 但永远无法解决这个问题 一个人可以下载四个不同的版本 其中三个正在积极开发中 3 x 4 0 x 4 1 x 5 x 据我了解 3 x 适用于 JRE 1 5 而 JRE 的其他所有版本都高于此版本 我使用
  • Netty:正确关闭 WebSocket

    如何从服务器端正确关闭 WebSocket 通道 连接 如果我使用一个ctx getChannel close the onerror在浏览器 Firefox 9 中抛出 页面加载时与 ws localhost 8080 websocket
  • 了解 netty 通道缓冲区和水印

    我正在尝试了解网络缓冲区和水印 作为一个测试用例 我有一个 netty 服务器 它向客户端写入数据 客户端被阻止 基本上每次读取之间有 10 秒的睡眠时间 在正常 I O 下 如果接收方被阻塞 TCP 发送方将受到限制 由于流量控制 发送速

随机推荐

  • 解决jenkins master挂载slave SSH Key Exchange not finished的问题

    1 报错日志 span class token punctuation span span class token number 01 span span class token operator span span class token
  • 11 个 Linux 上最佳的图形化Git 客户端

    Linux用户主要可以通过命令行来管理Git xff0c 不过外面有几种图形化用户界面 xff08 GUI xff09 Git客户软件 xff0c 它们便于用户在Linux桌面上高效 可靠地使用Git xff0c 即便提供不了所有命令行操作
  • yb3防爆电机型号含义_煤矿用防爆电机常用防爆电机型号

    煤矿用防爆电机概述 煤矿用防爆电机一般指在矿井下作业的防爆电机 xff0c 运行环境比较恶劣 xff0c 而且运作安全性较高 是一种具有防爆性能的电动机 xff0c 煤矿用防爆电机的构造主要针对外壳进行特别的加固 xff0c 一般用防爆电机
  • ARM架构授权和IP核授权有什么不一样啊?

    比如 xff0c 华为分别拿到这2个授权 xff0c 能做的有什么区别啊 xff1f 匿名 浏览 2976 次 推荐于2016 06 09 02 43 35 最佳答案 一个公司若想使用ARM的内核来做自己的处理器 xff0c 比如苹果三星T
  • 无人机目标定位C++程序

    针对动态背景下的目标检测定位 include lt opencv2 core core hpp gt include lt opencv2 highgui highgui hpp gt include lt opencv2 imgproc
  • gvim配置默认字体、配色等

    gvim配置默认字体 配色等 1 打开软件 xff0c 选择编辑 gt 启动设定 2 在其中添加自己的配置命令 xff0c 例如 xff1a filetype on 34 关闭自动备份 set noundofile set nobackup
  • Pixhawk原生PX4固件中的坑

    作为一名飞控开发的小学生 xff1a xff09 xff0c 最近入坑Pixhawk 43 PX4了 基于Pixhawk硬件平台进行二次开发 xff0c 有两套固件可以选择 xff1a Ardupilot系列也就是常说的APM固件 xff0
  • Linux(CentOS 6.3)设置VNC远程桌面连接

    刚研究Linux xff0c 选的是CentOS6 3的系统 xff0c 由于刚开始研究Linux xff0c 为了这个远程桌面连接走了不少弯路 xff0c 让大家见笑了 为了弄这个VNC远程连接 xff0c 网上找了很多资料 xff0c
  • python中的库和模块有什么区别_Python中模块(Module)和包(Package)的区别详解 python中的模块、库、包有什么区别?...

    python中的模块 xff0c 库 xff0c 包有什么区别 python中的模块 库 包有什么区别 python里面module package library三者有什么不同功能 安装 使用方法上有什么不同 python中的模块 库 包
  • 《大数据时代》读书笔记

    大数据时代 英国人Viktor Mayer Schonberger的著作 最重要的一点是介绍了一种思维模式的变化 主要观点 xff1a 大数据是指获取全部数据样本 xff0c 分析全部数据 xff0c 而不是只做抽样分析 大数据分析更关注相
  • power design初步使用01

    来自大佬 xff1a 别先生 点击即可查看原文 1 xff1a 入门级使用PowerDesigner软件创建数据库 xff08 直接上图怎么创建 xff0c 其他的概念知识可自行学习 xff09 我的PowerDesigner版本是16 5
  • http服务器demo,简单学习 vs下可以运行

    以下是使用C 43 43 在VS环境下编写的一个简单的HTTP服务器示例代码 xff1a include lt iostream gt include lt string gt include lt WS2tcpip h gt includ
  • power design初步使用02

    概念数据模型 逻辑数据模型 物理数据模型详解 出自 xff1a https www cnblogs com joechinochl articles 5252518 html 数据模型所描述的内容包括三个部分 xff1a 数据结构 数据操作
  • power design综合应用

    出自大佬宋辉 xff1a https www cnblogs com dfsxh articles 1295087 html Power Designer是Sybase公司的CASE 工具集 xff0c 使用它可以方便地对管理信息系统进行
  • LTE中layer的概念以及rank的概念

    原帖地址 xff1a https www mscbsc com bbs thread 293293 1 1 html https www mscbsc com askpro question83176 MIMO 表示多输入多输出 MIMO系
  • Endnote--在参考文献列表中添加DOI

    参考了此网站的内容 xff1a https www jianshu com p 11411c1c8495 1 在Endnote中给参考文献列表添加DOI的方法 xff1a Edit gt Output styles gt Eidt AJTR
  • t检验中的t值和p值是什么关系_t检验和p值的关系

    t检验中的t值和p值是什么关系 t检验和p值的关系 t检验 中通过样本均值 总体均值 样本标准差 样本量 可以计算出一个t值 xff0c 这个t值和p值有什么关系 xff1f 根据界值表又会查出一个数 xff0c 这个数和t值比较 xff0
  • ORACLE 之 标识符无效 问题总结及解决方案

    今天自己在家里做毕业设计 xff0c 遇到了ORACLE数据库的一些问题 xff0c 所以来总结一下 自己在上班的时候也遇到客户过提过这样的问题 xff0c 当时自己在百度上查了 xff0c 给客户解决完 自己也没有在意 xff0c 这次又
  • 数据结构总结

    本文目录 xff1a 数据结构分类1 数组2 栈3 队列4 链表5 树6 散列表7 堆8 图 数据结构分类 数据结构是指相互之间存在着一种或多种关系的数据元素的集合和该集合中数据元素之间的关系组成 常用的数据结构有 xff1a 数组 xff
  • Netty源码分析 (八)----- write过程 源码分析

    上一篇文章主要讲了netty的read过程 xff0c 本文主要分析一下write和writeAndFlush 主要内容 本文分以下几个部分阐述一个java对象最后是如何转变成字节流 xff0c 写到socket缓冲区中去的 pipelin