Netty入门-Channel

2023-11-03

目录

Channel详解

Channel的特点

Channel接口方法

ChannelOutboundInvoker接口

AttributeMap接口

ChannelHandler接口

ChannelInboundHandler接口

ChannelOutboundHandler接口

ChannelHandlerAdapter

ChannelInboundHandlerAdapor

ChannelOutboundHandlerAdapter

适配器的作用

ChannelPipeline接口

创建ChannelPipeline

ChannelHandlerContext接口


Channel详解

        Channel代表网络socket或能够进行IO操作的组件的连接关系。这些IO操作包括读、写、连接和绑定。Netty中的Channel提供了如下功能:

  • 查询当前Channel的状态。例如,是打开还是已连接状态等。
  • 提供Channel的参数配置。如接收缓冲区大小。
  • 提供支持的IO操作。如读、写连接和绑定
  • 提供ChannelPipeline。ChannelPipelin用于处理所有与Channel关联的IO事件和请求。

Channel的特点

1. 所有IO操作都是异步的

IO调用将立即返回,返回一个ChannelFuture实例。

2. Channel是分层的

3. 向下转型以下访问特定于传输的操作

4. 释放资源

Channel接口方法

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    ChannelId id();
    EventLoop eventLoop();
    Channel parent();
    ChannelConfig config();
    boolean isOpen();
    boolean isRegistered();
    boolean isActive();
    ChannelMetadata metadata();
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    ChannelFuture closeFuture();
    boolean isWritable();
    long bytesBeforeUnwritable();
    long bytesBeforeWritable();
    Channel.Unsafe unsafe();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
    Channel read();
    Channel flush();
    public interface Unsafe {
        Handle recvBufAllocHandle();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop var1, ChannelPromise var2);
        void bind(SocketAddress var1, ChannelPromise var2);
        void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
        void disconnect(ChannelPromise var1);
        void close(ChannelPromise var1);
        void closeForcibly();
        void deregister(ChannelPromise var1);
        void beginRead();
        void write(Object var1, ChannelPromise var2);
        void flush();
        ChannelPromise voidPromise();
        ChannelOutboundBuffer outboundBuffer();
    }
}
  • id()方法返回全局唯一的ChannelId
  • eventLoop()方法返回分配给该Channel的EventLoop,一个EventLoop就是一个线程,用来处理连接的生命周期中所发生的事件
  • parent()方法返回该Channel的父Channel
  • config()方法返回该Channel的ChannelConfig,其中包含了该Channel的所有配置设置支持热更新
  • pipeline()方法返回该Channel对应的ChannelPipeline
  • alloc方法返回分配给该Channel的ByteBufAllocator,可以用来分配ByteBuf

ChannelOutboundInvoker接口

声明了所有出站的网络操作:

package io.netty.channel;

import java.net.SocketAddress;

public interface ChannelOutboundInvoker {
    ChannelFuture bind(SocketAddress var1);
    ChannelFuture connect(SocketAddress var1);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress var1, ChannelPromise var2);
    ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
    ChannelFuture disconnect(ChannelPromise var1);
    ChannelFuture close(ChannelPromise var1);
    ChannelFuture deregister(ChannelPromise var1);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object var1);
    ChannelFuture write(Object var1, ChannelPromise var2);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
    ChannelFuture writeAndFlush(Object var1);
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable var1);
    ChannelPromise voidPromise();
}

ChannelFuture用于获取异步的结果,ChannelPromise是对ChannelFuture的扩展,支持写的操作。ChannelPromise也被称为可写的ChannelFuture。

AttributeMap接口

package io.netty.util;
public interface AttributeMap {
    <T> Attribute<T> attr(AttributeKey<T> var1);
    <T> boolean hasAttr(AttributeKey<T> var1);
}

AttributeMap就是类似于Map的键值对,键就是AttributeKey类型,值是Attribute类型。

Netty提供了AttributeMap的默认实现类DefaultAttributeMap,与JDK中的ConcurrentHashMap相比,在高并发下DefaultAttributeMap可以更加节省内存。

package io.netty.util;

import io.netty.util.internal.ObjectUtil;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * Default {@link AttributeMap} implementation which not exibit any blocking behaviour on attribute lookup while using a
 * copy-on-write approach on the modify path.<br> Attributes lookup and remove exibit {@code O(logn)} time worst-case
 * complexity, hence {@code attribute::set(null)} is to be preferred to {@code remove}.
 */
public class DefaultAttributeMap implements AttributeMap {

    private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, DefaultAttribute[]> ATTRIBUTES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, DefaultAttribute[].class, "attributes");
    private static final DefaultAttribute[] EMPTY_ATTRIBUTES = new DefaultAttribute[0];

    /**
     * Similarly to {@code Arrays::binarySearch} it perform a binary search optimized for this use case, in order to
     * save polymorphic calls (on comparator side) and unnecessary class checks.
     */
    private static int searchAttributeByKey(DefaultAttribute[] sortedAttributes, AttributeKey<?> key) {
        int low = 0;
        int high = sortedAttributes.length - 1;

        while (low <= high) {
            int mid = low + high >>> 1;
            DefaultAttribute midVal = sortedAttributes[mid];
            AttributeKey midValKey = midVal.key;
            if (midValKey == key) {
                return mid;
            }
            int midValKeyId = midValKey.id();
            int keyId = key.id();
            assert midValKeyId != keyId;
            boolean searchRight = midValKeyId < keyId;
            if (searchRight) {
                low = mid + 1;
            } else {
                high = mid - 1;
            }
        }

        return -(low + 1);
    }

    private static void orderedCopyOnInsert(DefaultAttribute[] sortedSrc, int srcLength, DefaultAttribute[] copy,
                                            DefaultAttribute toInsert) {
        // let's walk backward, because as a rule of thumb, toInsert.key.id() tends to be higher for new keys
        final int id = toInsert.key.id();
        int i;
        for (i = srcLength - 1; i >= 0; i--) {
            DefaultAttribute attribute = sortedSrc[i];
            assert attribute.key.id() != id;
            if (attribute.key.id() < id) {
                break;
            }
            copy[i + 1] = sortedSrc[i];
        }
        copy[i + 1] = toInsert;
        final int toCopy = i + 1;
        if (toCopy > 0) {
            System.arraycopy(sortedSrc, 0, copy, 0, toCopy);
        }
    }

    private volatile DefaultAttribute[] attributes = EMPTY_ATTRIBUTES;

    @SuppressWarnings("unchecked")
    @Override
    public <T> Attribute<T> attr(AttributeKey<T> key) {
        ObjectUtil.checkNotNull(key, "key");
        DefaultAttribute newAttribute = null;
        for (;;) {
            final DefaultAttribute[] attributes = this.attributes;
            final int index = searchAttributeByKey(attributes, key);
            final DefaultAttribute[] newAttributes;
            if (index >= 0) {
                final DefaultAttribute attribute = attributes[index];
                assert attribute.key() == key;
                if (!attribute.isRemoved()) {
                    return attribute;
                }
                // let's try replace the removed attribute with a new one
                if (newAttribute == null) {
                    newAttribute = new DefaultAttribute<T>(this, key);
                }
                final int count = attributes.length;
                newAttributes = Arrays.copyOf(attributes, count);
                newAttributes[index] = newAttribute;
            } else {
                if (newAttribute == null) {
                    newAttribute = new DefaultAttribute<T>(this, key);
                }
                final int count = attributes.length;
                newAttributes = new DefaultAttribute[count + 1];
                orderedCopyOnInsert(attributes, count, newAttributes, newAttribute);
            }
            if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
                return newAttribute;
            }
        }
    }

    @Override
    public <T> boolean hasAttr(AttributeKey<T> key) {
        ObjectUtil.checkNotNull(key, "key");
        return searchAttributeByKey(attributes, key) >= 0;
    }

    private <T> void removeAttributeIfMatch(AttributeKey<T> key, DefaultAttribute<T> value) {
        for (;;) {
            final DefaultAttribute[] attributes = this.attributes;
            final int index = searchAttributeByKey(attributes, key);
            if (index < 0) {
                return;
            }
            final DefaultAttribute attribute = attributes[index];
            assert attribute.key() == key;
            if (attribute != value) {
                return;
            }
            final int count = attributes.length;
            final int newCount = count - 1;
            final DefaultAttribute[] newAttributes =
                    newCount == 0? EMPTY_ATTRIBUTES : new DefaultAttribute[newCount];
            // perform 2 bulk copies
            System.arraycopy(attributes, 0, newAttributes, 0, index);
            final int remaining = count - index - 1;
            if (remaining > 0) {
                System.arraycopy(attributes, index + 1, newAttributes, index, remaining);
            }
            if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
                return;
            }
        }
    }

    @SuppressWarnings("serial")
    private static final class DefaultAttribute<T> extends AtomicReference<T> implements Attribute<T> {

        private static final AtomicReferenceFieldUpdater<DefaultAttribute, DefaultAttributeMap> MAP_UPDATER =
                AtomicReferenceFieldUpdater.newUpdater(DefaultAttribute.class,
                                                       DefaultAttributeMap.class, "attributeMap");
        private static final long serialVersionUID = -2661411462200283011L;

        private volatile DefaultAttributeMap attributeMap;
        private final AttributeKey<T> key;

        DefaultAttribute(DefaultAttributeMap attributeMap, AttributeKey<T> key) {
            this.attributeMap = attributeMap;
            this.key = key;
        }

        @Override
        public AttributeKey<T> key() {
            return key;
        }

        private boolean isRemoved() {
            return attributeMap == null;
        }

        @Override
        public T setIfAbsent(T value) {
            while (!compareAndSet(null, value)) {
                T old = get();
                if (old != null) {
                    return old;
                }
            }
            return null;
        }

        @Override
        public T getAndRemove() {
            final DefaultAttributeMap attributeMap = this.attributeMap;
            final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
            T oldValue = getAndSet(null);
            if (removed) {
                attributeMap.removeAttributeIfMatch(key, this);
            }
            return oldValue;
        }

        @Override
        public void remove() {
            final DefaultAttributeMap attributeMap = this.attributeMap;
            final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
            set(null);
            if (removed) {
                attributeMap.removeAttributeIfMatch(key, this);
            }
        }
    }
}

ChannelHandler接口

package io.netty.channel;

import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


public interface ChannelHandler {

    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     *
     * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
     * implement the method there.
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}


 Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
 its ChannelPipeline

ChannelHandler本身没有提供什么方法,可以使用其子类:

  • ChannelInboundHandler:处理入站IO事件
  • ChannelOutboundHandler:处理出站IO事件
  • ChannelHandlerAdapter:采用适配器模式的ChannelHandler适配器

ChannelInboundHandler接口

package io.netty.channel;

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler接口

package io.netty.channel;

import java.net.SocketAddress;


public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error occurs
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerMask.Skip;
import io.netty.util.internal.InternalThreadLocalMap;
import java.util.Map;

public abstract class ChannelHandlerAdapter implements ChannelHandler {
    boolean added;
    public ChannelHandlerAdapter() {
    }
    protected void ensureNotSharable() {
        if (this.isSharable()) {
            throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
        }
    }
    public boolean isSharable() {
        Class<?> clazz = this.getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = (Boolean)cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }
    /** @deprecated */
    @Skip
    @Deprecated
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelHandlerAdaptor常用的两个子类,分别是ChannelInboundHandlerAdapor、ChannelOutboundHandlerAdatper

ChannelInboundHandlerAdapor

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    public ChannelInboundHandlerAdapter() {
    }

    @Skip
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelOutboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    public ChannelOutboundHandlerAdapter() {
    }
    @Skip
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }
    @Skip
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }
    @Skip
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.disconnect(promise);
    }
    @Skip
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.close(promise);
    }
    @Skip
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }
    @Skip
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }
    @Skip
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }
    @Skip
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

适配器的作用

使用适配器是因为适配器的子类不需要实现父类中的所有方法,按需覆盖适配器的方法即可。

ChannelPipeline接口

ChannelPipeline接口设计采用了责任链模式,底层采用双向链表的数据结构,将链上个各个处理器串联起来。

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    ChannelPipeline addFirst(String name, ChannelHandler handler);

    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addLast(String name, ChannelHandler handler);

    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addFirst(ChannelHandler... handlers);

    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline addLast(ChannelHandler... handlers);

    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline remove(ChannelHandler handler);

    ChannelHandler remove(String name);

    <T extends ChannelHandler> T remove(Class<T> handlerType);

    ChannelHandler removeFirst();

    ChannelHandler removeLast();

    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);

    ChannelHandler first();

    ChannelHandlerContext firstContext();

    ChannelHandler last();

    ChannelHandlerContext lastContext();

    ChannelHandler get(String name);

    <T extends ChannelHandler> T get(Class<T> handlerType);

    ChannelHandlerContext context(ChannelHandler handler);

    ChannelHandlerContext context(String name);

    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

    Channel channel();

    List<String> names();

    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

    @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();
}

创建ChannelPipeline

ChannelPipeline数据管道是与Channel通道绑定的,一个Channel通道对应一个ChannelPipeline,ChannelPipeline是在Channel初始化时被创建的。

ChannelHandlerContext接口

ChannelHandlerContext接口是联系ChannelHandler与其ChannelPipeline之间的纽带。

每当有ChannelHandler添加到ChannelPipeline中时,都会常见ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。例如,ChannelHandlerContext可以通知ChannelPipeline中的下一个ChannelHandler开始执行及动态修改其所属的ChannelPipeline。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty入门-Channel 的相关文章

  • 每个连接的 Netty 多线程

    我是 Netty 新手 我想开发一个服务器 旨在接收来自可能少数 假设最多有 2 个 客户端的请求 但是每个客户端都会不断地向服务器发送许多请求 服务器必须处理此类请求并响应客户端 因此 在这里我假设即使我配置了多个工作线程 它也可能没有用
  • 我可以从 Tcl 中的文件句柄中找到文件名吗?

    如同我可以从 Perl 中的文件句柄中找到文件名吗 https stackoverflow com questions 2813092 can i find a filename from a filehandle in perl但在 Tc
  • 直接通道使用与使用代理?

    正如标题所暗示的 我试图理解为什么在 WCF 中有时人们选择 生成代理 而不是使用 ChannelFactory 手动创建新的通道实例 我已经看过每一种的例子 但还没有真正找到任何解释为什么你会选择其中一种 老实说 我只与渠道合作过Chan
  • 如何实现对无缓冲通道的非阻塞写入?

    From 有效的行动 https golang org doc effective go html channels 接收器总是阻塞 直到有数据要接收 如果通道未缓冲 则发送方会阻塞 直到接收方收到该值 But 信号 通知 https go
  • 通道是否通过引用隐式传递

    gotour 有这个频道示例 https tour golang org concurrency 2 https tour golang org concurrency 2 package main import fmt func sum
  • 如何阻止netty在服务器套接字上监听和接受

    有没有办法告诉 netty 停止侦听和接受套接字上的新连接 但完成当前连接上任何正在进行的工作 你可以直接关闭ServerSocketChannel由创建的ChannelFactory 通常 ServerSocketChannel由返回Se
  • 将切片的所有项目添加到通道中

    在 Go 中 是否有比以下更惯用的方法将数组 切片的所有元素添加到通道中 ch make chan string values string lol cat lolcat go func for v range values ch lt v
  • 渠道有什么用?

    在查看一些 Go 代码时 我发现了以下内容 ch make chan int 我在在线教程中查找了 Go Channels 的工作原理 https tour golang org concurrency 2 https tour golan
  • 通过 Django Channels 和 Websockets 将实时更新推送到客户端

    我正在尝试制作一个向客户端显示实时更新数据的页面 该网站的其余部分是使用 Django 构建的 因此我尝试使用 Channels 来实现此目的 我显示的数据保存在 JSON 文件和 MySQL 数据库中 以便在网站的其他部分进行进一步计算
  • 如何关闭 Netty 库调试输出?

    我正在使用 Netty 通过 Ning async HTTPlibrary http www ning com code 2010 03 introducing nings asynchronous http client library
  • 为什么我们真的需要多个 Netty boss 线程?

    我真的很困惑老板组的线程数量 我无法弄清楚我们需要多个老板线程的场景 在Boss 组是否需要多个线程 https stackoverflow com questions 22280916 do we need more than a sin
  • Golang:我可以投射到 chan 接口吗{}

    我正在尝试为订阅编写一个通用包装器 例如 type Subscriber interface Subscribe addr string chan interface 假设有一个我想使用的库 其中有一个 subscribe 方法 但它使用c
  • 如何用Java处理来自客户端的Websocket消息?

    我正在使用 Websocket 用 Ja va 开发客户端 服务器应用程序 目前 所有客户端消息均使用 switch case 进行处理 如下所示 OnMessage public String onMessage String unscr
  • Netty:正确关闭 WebSocket

    如何从服务器端正确关闭 WebSocket 通道 连接 如果我使用一个ctx getChannel close the onerror在浏览器 Firefox 9 中抛出 页面加载时与 ws localhost 8080 websocket
  • 当涉及多个渠道时,select 如何工作?

    我发现在多个非缓冲通道上使用 select 时 例如 select case lt chana case lt chanb 即使两个通道都有数据 但在处理此选择时 case chana 和 case chanb 的跟注不平衡 package
  • 匿名结构和空结构

    http play golang org p vhaKi5uVmm http play golang org p vhaKi5uVmm package main import fmt var battle make chan string
  • 与通道相比,sync.WaitGroup 的优势是什么?

    我正在开发一个并发 Go 库 我偶然发现了 goroutine 之间两种不同的同步模式 其结果相似 等待组 https play golang org p ZYPLlcp16TZ package main import fmt sync t
  • 关闭长度未知的通道

    当不了解频道时我无法关闭频道 length package main import fmt time func gen ch chan int var i int for time Sleep time Millisecond 10 ch
  • Spring WebFlux Netty SSL 与自签名证书错误

    我正在尝试使用服务器端的自签名证书通过本地主机中的 https 访问在 Netty 上运行的 Spring Boot 应用程序 My application properties看起来像这样 server ssl enabled true
  • netty 4.x 中 ServerBootstrap.option() 和 ServerBootstrap.childOption() 有什么区别

    根据文档4 0 中值得注意的新内容 http netty io wiki new and noteworthy in 4 0 html wiki h3 31 netty4提供了新的bootstrap API 文档给出了以下代码示例 publ

随机推荐

  • 路由交换-华为usg6000防火墙上配置内网外网通过公网ip访问http服务

    源nat是将私网地址转换为公网地址 实现内部网络访问外网 目的dnat是将对公网访问Ip转换为内网ip 实现外部网络访问内网资源 目的nat的实现有多种方式 一对一转换 带端口和不带端口的转换 最常用的就是使用带端口的一对多转换 即我们常说
  • Levinson-Durbin快速递推法功率谱估计(Python实现版)

    Levinson Durbin快速递推法功率谱估计是在Yule Walker方程法之上建立的 如果对于Yule Walker方程法不熟悉的话可以参考我的一篇博客 Yule Walker方程法参数化谱估计 Python实现版 声明 博客原本在
  • 文件上传漏洞upload-libs pass5

    文件上传漏洞upload libs pass4 首先查看源码 无法使用空格和大小写绕过 且黑名单过滤了 htaccess 查看提示 利用readme php文件 因为没有过滤ini文件 创建 text ini和一句话木马文件 内容为 aut
  • HIVE厂牌艺人_Labelwarts Vol. 2:洛杉矶天才厂牌 Odd Future Records 的开始到结束

    We re F kin Radical been F kin Awesome 我们太TMD激进 太TMD耀眼 Talked a lotta sh t so far words you re at a loss 说着一大堆胡话 让你们都不知所
  • 将ant design pro打包的JS分离出去

    通过analyze分析发现其实react dom并不算小 有100多kb 所以就想把它单独引用 于是就在config ts增加 externals react window React react dom window ReactDOM b
  • 利用python3 生成密码本

    一 思路 1 把密码中含有哪些字符串都放入一个迭代器中 2 确定生成的密码是几位数的 3 将生成的所有密码写入一个文件里面 二 代码 import itertools as its 迭代器 words 1234567890 生成密码本的位数
  • 3.2 Python图像的频域图像增强-高通和低通滤波器

    3 2 Python图像的频域图像增强 高通和低通滤波器 文章目录 3 2 Python图像的频域图像增强 高通和低通滤波器 1 算法原理 1 1理想滤波器 1 2巴特沃斯滤波器 1 3指数滤波器 2 代码 3 效果 1 算法原理 高通和低
  • Mongodb笔记六:排序与限制输出

    一 排序 db collectionname find sort key1 1 key 1 这里的1代表升序 1代表降序 如 对所有人按年龄升序排序 降序排序 二 索引 索引是特殊的数据结构 索引存储在一个易于遍历读取的数据集合中 索引是对
  • FFmpeg中RTSP客户端拉流测试代码

    之前在https blog csdn net fengbingchun article details 91355410中给出了通过LIVE555实现拉流的测试代码 这里通过FFmpeg来实现 代码量远小于LIVE555 实现模块在liba
  • 蓝桥杯每日一题——手算题·空间

    本题为填空题 只需要算出结果后 在代码中使用输出语句将所填结果输出即可 小蓝准备用 256MB 的内存空间开一个数组 数组的每个元素都是 3232 位 二进制整数 如果不考虑程序占用的空间和维护内存需要的辅助空间 请问 56MB 的空间可以
  • [阶段二] 4. MySQL的基本操作

    mysql的基本操作 数据插入 INSERT 语句可以向数据表写入数据 可以是一条记录 也可以是多条记录 INSERT INTO 数据表名称 字段1 字段2 VALUES 值1 值2 插入一条记录 INSERT INTO 数据表名称 字段1
  • 分析工具 nvprof简介

    nvprof 是一个可用于Linux Windows和OS X的命令行探查器 使用 nvprof myApp 运行我的应用程序 我可以快速看到它所使用的所有内核和内存副本的摘要 摘要将对同一内核的所有调用组合在一起 显示每个内核的总时间和总
  • 十六进制转二进制

    public static String hexToBinary String hex if hex null hex length 2 0 return null String bString String tmp for int i 0
  • Visual Studio(VS) 编程推荐字体和主题设置

    首先是字体 工具 gt 选项 gt 环境 gt 字体和颜色 具体图如下 选择Consolas的原因 Consolas算是最常见的编码字体了 在很多的编译软件都是这个字体 而且在这个字体下的中英文标点和半角圆角符号也能有比较明显的区别 至于字
  • Java 集合 - Map 接口

    文章目录 1 概述 2 常用 API 3 遍历 Map 集合 4 HashMap 和 Hashtable 5 LinkedHashMap 6 TreeMap 7 Properties 8 Set 集合与 Map 集合的关系 9 总结 1 概
  • C++11/14之模板全特化,偏特化

    目录 模板全特化 偏特化 类模板特化 类模板全特化 a 常规全特化 b 特化成员函数而不是模板 类模板偏特化 局部特化 a 模板参数数量 b 模板参数范围 int const int 比int小 函数模板特化 函数模板全特化 函数模板偏特化
  • LayerNorm的理解

    LayerNorm计算公式 y x E x
  • C语言实现多级反馈队列调度算法

    include
  • java架构师进阶之路

    Java架构师 应该算是一些Java程序员们的一个职业目标了吧 很多码农码了五六年的代码也没能成为架构师 那成为Java架构师要掌握哪些技术呢 总体来说呢 有两方面 一个是基础技术 另一个就是组织能力和提出解决方案能力了 如果你是想成为Ja
  • Netty入门-Channel

    目录 Channel详解 Channel的特点 Channel接口方法 ChannelOutboundInvoker接口 AttributeMap接口 ChannelHandler接口 ChannelInboundHandler接口 Cha