Netty核心源码分析(一),Netty的Server端启动过程源码分析

2023-05-16

文章目录

  • 系列文章目录
  • 一、Netty的Server端启动过程源码分析
    • 1、NioEventLoopGroup的创建
      • (1)构造方法
    • 2、ServerBootstrap的创建
      • (1)构造方法
      • (2)group方法
      • (3)channel方法
      • (4)option方法
      • (5)handler方法
      • (6)childHandler方法
      • (7)bind方法
      • (8)bind方法中——initAndRegister方法
      • (9)bind方法中——initAndRegister方法中Channel创建逻辑
      • (10)bind方法中——initAndRegister方法中init方法
      • (11)Pipeline的addLast方法
      • (13)bind方法中——dobind0方法
    • 3、启动完毕的事件循环

系列文章目录

Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
Netty核心源码分析(三)业务请求执行关键——ChannelPipeline、ChannelHandler、ChannelHandlerContext源码分析
Netty核心源码分析(四)心跳检测源码分析
Netty核心源码分析(五)核心组件EventLoop源码分析

一、Netty的Server端启动过程源码分析

1、NioEventLoopGroup的创建

Server端会创建两个EventLoopGroup,我们一般使用NioEventLoopGroup:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

这两个EventLoopGroup是整个Netty的核心对象。boosGroup用于接收Tcp请求,他会将请求交给workerGroup,workerGroup会获取真正的连接,然后和连接进行通信,比如读写解码编码等操作。

EventLoopGroup 是事件循环组(线程组),内涵多个EventLoop,可以注册channel,用于在事件循环中进行选择(和select相关)。

(1)构造方法

在NioEventLoopGroup的构造方法中,如果不传参的话,默认创建cpu核心数*2个线程:

// io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
public NioEventLoopGroup() {
    this(0);
}
// ... 一直往上追
// io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

我们发现在父类中,如果nThreads传参为0,就会赋值为DEFAULT_EVENT_LOOP_THREADS :

// MultithreadEventLoopGroup静态代码块
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); // 获取cpu核心数 * 2

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

最终是获取了CPU核心数 * 2,并创建这些线程,我本机CPU是20核,所以创建了40个workerGroup线程。
在这里插入图片描述
我们在这个构造方法继续往上追:

// nThreads表示使用的线程数
// executor表示执行器如果为null就采用Netty默认的线程工厂和默认的执行器ThreadPerTaskExecutor
// chooserFactory是上一步传入的DefaultEventExecutorChooserFactory
// args表示在创建执行器时传入的固定参数
// io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) { // 为空的话,会创建默认的执行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
	// 创建指定线程数的执行器数组
    children = new EventExecutor[nThreads]; // 40
	// 初始化线程数组
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
            	// 如果启动失败,会关闭线程并且停止EventExecutor ,优雅关闭
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
    	// 每个EventExecutor添加监听器
        e.terminationFuture().addListener(terminationListener);
    }
	// 将所有单例线程池添加到HashSet中
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

此时我们知道了,在NioEventLoopGroup的children中,就是包含着这些EventExecutor,而我们默认的就是使用NioEventLoop,

在这里插入图片描述
NioEventLoop的父类SingleThreadEventLoop,包含着很多子类:
在这里插入图片描述
而在我们定义的new NioEventLoopGroup()中,就相当于已经定义好了该实现,通常我们使用NioEventLoop或者EpollEventLoop(linux需支持epoll,提高性能)。

2、ServerBootstrap的创建

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc()));
         }
         p.addLast(new LoggingHandler(LogLevel.INFO));
         //p.addLast(new EchoServerHandler());
     }
 });

(1)构造方法

创建的ServerBootstrap是一个引导类,用于启动服务器和引导整个程序的初始化,它和ServerChannel关联,而ServerChannel继承了Channel。

ServerBootstrap包含着ChannelHandler信息以及EventLoopGroup等信息:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    private volatile EventLoopGroup childGroup;
    private volatile ChannelHandler childHandler;

    public ServerBootstrap() { }

    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        synchronized (bootstrap.childAttrs) {
            childAttrs.putAll(bootstrap.childAttrs);
        }
    }

ServerBootstrap的父类还额外包含一些address等信息:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;

    AbstractBootstrap() {
        // Disallow extending from a different package.
    }

    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }

(2)group方法

b.group(bossGroup, workerGroup)

将boosGroup和workerGroup传入参数中:

// io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup); // 将bossGroup放入父类AbstractBootstrap
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup; // workerGroup放入ServerBootstrap 中
    return this;
}

// io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)
public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group; // 将bossGroup放入AbstractBootstrap
    return self();
}

我们发现,ServerBootstrap和其父类AbstractBootstrap对于EventLoopGroup似乎有着层级关系,其中bossGroup放入了父类中,workerGroup放入了子类。

(3)channel方法

.channel(NioServerSocketChannel.class)

添加了一个Channel的class对象,引导类将通过这个Class对象反射创建ChannelFactory

// io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    // 创建ReflectiveChannelFactory
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

在ReflectiveChannelFactory中,重写着newChannel方法,通过反射创建Channel:

// io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
    try {
        return clazz.getConstructor().newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

注意!Channel的创建在bind方法中,调用了ChannelFactory的newChannel方法。

(4)option方法

.option(ChannelOption.SO_BACKLOG, 100)

设置一些配置选项:

// io.netty.bootstrap.AbstractBootstrap#option
public <T> B option(ChannelOption<T> option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {
            options.put(option, value);
        }
    }
    return self();
}

(5)handler方法

.handler(new LoggingHandler(LogLevel.INFO))

添加一些服务请求专用的处理器:

// io.netty.bootstrap.AbstractBootstrap#handler(io.netty.channel.ChannelHandler)
public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
    return self();
}

实际上handler方法中传入的Handler,是交给boosGroup处理的Handler,因为handler方法是ServerBootstrap的父类AbstractBootstrap中的。

(6)childHandler方法

// io.netty.bootstrap.ServerBootstrap#childHandler(io.netty.channel.ChannelHandler)
public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    }
    this.childHandler = childHandler;
    return this;
}

添加workerGroup的处理类,该方法是ServerBootstrap的,添加的handler也是为workerGroup服务的。

(7)bind方法

ChannelFuture f = b.bind(PORT).sync();

绑定端口并进行阻塞,bind方法执行成功之后,server端就算启动成功了。

// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
// io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

// io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
	// 初始化和注册,Channel的创建和初始化pipeline就是在这做的
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 完成对端口的绑定
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

(8)bind方法中——initAndRegister方法

// io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    	// 真正创建Channel的方法,ServerBootstrap的channel方法传入的class,在此处通过工厂进行了实例化
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

(9)bind方法中——initAndRegister方法中Channel创建逻辑

在initAndRegister调用了channelFactory.newChannel();之后,实际是调用了ReflectiveChannelFactory中,newChannel方法,通过反射创建Channel:

// io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
    try {
        return clazz.getConstructor().newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

NioServerSocketChannel的构造方法,做了许多NioServerSocketChannel的初始化工作:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

(1) 通过 NIO 的 SelectorProvider 的 openServerSocketChannel 方法得到JDK 的 channel。目的是让 Netty 包装 JDK 的 channel。
(2) 创建了一个唯一的 ChannelId,创建了一个 NioMessageUnsafe,用于操作消息,创建了个 DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
(3) 创建了一个 NioServerSocketChannelConfig 对象,用于对外展示一些配置。

(10)bind方法中——initAndRegister方法中init方法

init方法是在ServerBootstrap中实现的一个方法:

// io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) throws Exception {
	// 获取Options配置的属性
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
    	// 线程不安全,LinkedHashMap,所以需要同步
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) { // 处理attr属性
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
	// 处理pipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

(11)Pipeline的addLast方法

Pipeline的addLast方法是核心。

// io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
    	// 检查handler是否符合标准
        checkMultiplicity(handler);
		// 创建一个AbstractChannelHandlerContext 对象,该对象是ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到Pipeline中时,都会创建COntext。Context的主要功能是管理他所关联的Handler和同一个Pipeline中其他Handler之间的交互。
        newCtx = newContext(group, filterName(name, handler), handler);

		// 将newContext保存
        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
// io.netty.channel.DefaultChannelPipeline#addLast0
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

我们可以看到,addLast方法并不是将Handler放到了最后,而是将我们自定义的Handler放到了tail的前一个,这样tail永远会在最后面,做一些系统的固定工作。

(13)bind方法中——dobind0方法

// io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
            	// 调用channel的bind方法,因为此刻channel已经初始化完成了
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel.bind方法我们一步一步追溯:

// io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
// io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
// io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    	// 执行
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}
// io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
        	// 执行
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}
// io.netty.handler.logging.LoggingHandler#bind
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    if (logger.isEnabled(internalLevel)) {
        logger.log(internalLevel, format(ctx, "BIND", localAddress));
    }
    ctx.bind(localAddress, promise);
}
// 
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    	// 执行
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}
// io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
        	// 执行
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}
// io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    // unsafe.bind
    unsafe.bind(localAddress, promise);
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
    	// 关键方法
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

debug追了一大顿,终于来到了我们关键了!这里就是执行了NIO的channel的bind方法了:

// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
    	// 版本大于jdk7
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
    	// 版本小于jdk7
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

3、启动完毕的事件循环

bind方法执行完毕之后,此时debug一直下去的话,会最终进入到NioEventLoop的run方法中,这是一个死循环:

// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

此时,就开始接收事件,Netty算是正式启动了。
BoosGroup已经创建完毕并且启动完成,开始下面的Accept接收客户端请求的过程了。
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty核心源码分析(一),Netty的Server端启动过程源码分析 的相关文章

  • Docker使用笔记

    软件安装 https docs docker com engine install ubuntu 下载镜像 span class token function docker span pull ubuntu 创建一个CONTAINER 示例
  • ubuntu编译安装最新的tmux

    通过apt get安装的tmux版本比较旧 xff0c 我喜欢使用最新的版本 那就自己编译安装一下吧 很简单 xff0c 耗时1分钟左右 环境 操作系统 xff1a Ubuntu 20 04 3 LTS 安装的tmux版本 xff1a tm
  • Ubuntu Linux 更改主机名(hostname)

    操作 编辑 etc hostname文件 span class token function vim span etc hostname 文件内容修改成自己想要的名称 修改完后 xff0c 重启机器就生效了 span class token
  • docker文件目录迁移

    docker默认存放路径是 var lib docker xff0c 按理来说没有什么问题 但是在我安装操作系统时 xff0c 分区空间分的太少了 xff08 50G xff09 但是 home目录就非常大了 所以我想把docker的默认路
  • ubuntu开启ssh服务

    环境 我的测试环境是 xff1a docker中的Ubuntu 20 04 3 LTS 安装openssh server span class token function sudo span span class token functi
  • neovim无法中文显示的问题

    场景 中文的语言环境 xff0c 其他支持中文的程序 正常能显示中文 xff0c 比如 date xff0c vim都可以支持中文 就neovim显示的是英文 我下载的neovim是全功能的版本 xff0c 支持中文的 所以排除软件的原因
  • Netty使用Google Protobuf进行编解码

    文章目录 一 概述1 编解码基础2 Netty编解码器3 Protobuf概述 二 Protobuf基本使用1 引入jar包2 下载Protobuf3 编写Student proto4 生成StudentPOJO类5 服务器端6 客户端7
  • ranger没有那个文件或目录: ‘screen‘: ‘screen‘

    背景 首先ranger是正常安装 xff0c 正常环境下也是可以正常使用的 当我在tmux中使用ssh远程 xff0c 然后使用ssh远程的机器中的ranger xff0c 就发现ranger打开报错了 嵌套层级 xff1a tmux ss
  • ubuntu搭建nvim开发环境准备工作

    以下代码 xff0c 仅供方便复制使用的 想了解细节 xff0c 可以参考 xff1a windows10安装子系统Ubuntu 20 04 https blog csdn net lxyoucan article details 1198
  • Failed to execute child process “dbus-launch“

    场景 在ubuntu中搭建vnc桌面环境 xff0c 安装 Minimal Xfce Desktop span class token comment 精简安装 span span class token function sudo spa
  • ubuntu图形界面乱码解决办法

    现象 ubuntu的vnc远程桌面中出现了一些乱码 原因分析 从上图中可以看出英文显示正常 xff0c 那么可以判断应该是中文乱码导致的 应该是系统中没有安装中文字体导致 解决办法 方法1 xff1a 使用英文的界面 xff0c 但是我还是
  • Kylin V10 SP1(ubuntu)编译安装python3新版本

    系统自带的python太旧了 xff0c 所以想编译安装最新版本的python 环境 span class token function cat span etc release span class token assign left v
  • docker容器安装图形桌面

    文章目录 视频教程版本信息创建一个CONTAINERubuntu官方国内源docker镜像unminimize中文环境设置中文环境 安装安装TigerVNC Server安装 xfce4精简版本 配置设置vnc密码 vnc xstartup
  • ubuntu官方国内源

    背景 之前我一直在使用中科大的源 xff0c 还是挺快的 一直也没有感觉有什么问题 直到最近在折腾vnc xff0c 发现中科大的源有一些包会404 xff0c 安装不了 而我在vmware中的正好是默认的cn archive ubuntu
  • iterm2禁用鼠标选择复制

    iterm2默认选中文字就自动复制到剪切板 xff0c 方便是挺方便 但是有时候 xff0c 复制了一段文字想贴到iterm2中 xff0c 结果鼠标一滑 xff0c 不小心选择到了文字了 xff0c 之前复制的内容就被替换了 那么怎么关闭
  • Ant Design for React Native精简笔记

    背景 Ant Design是一套不错的UI组件库 xff0c 功能强大 但是依赖了很多其他的组件 xff0c 在RN6 3以后要自己安装以下组件才能正常使用 span class token function yarn span span
  • yarn使用说明

    yarn优点 速度超快 Yarn 缓存了每个下载过的包 xff0c 所以再次使用时无需重复下载 同时利用并行下载以最大化资源利用率 xff0c 因此安装速度更快 超级安全 在执行代码之前 xff0c Yarn 会通过算法校验每个安装包的完整
  • Netty编解码器,Netty自定义编解码器解决粘包拆包问题,Netty编解码器的执行过程详解

    文章目录 一 编解码器概述1 编解码器概述2 编码器类关系图3 解码器类关系图 二 以编解码器为例理解入站出站1 Server端2 Client端3 编解码器3 执行查看结果4 注意事项 三 Netty其他内置编解码器1 Replaying
  • ERROR Error: Reanimated 2 failed to create a worklet

    报错 To reload the app press span class token string 34 r 34 span To span class token function open span developer menu pr
  • React Native创建一个新的项目常用命令

    创建项目 创建一个typescript项目 npx react native init ywh template react native template typescript 导入库 整合 方便一键安装 以下仅是本人常用的组件 xff0

随机推荐

  • 各操作系统支持图标字体的终端推荐

    软件推荐 操作系统推荐macOSiTerm2windowsWindows TerminallinuxGNOME 终端 等androidTermux或TermiusiOSTermius 不完美 如果你在macOS我推荐你使用iterm2 xf
  • MAME set 4 player

    背景 我本身有一个手柄 xff0c 准备12 12在入手一个 然后小杨同学就把他的一对手柄借给我啦 xff0c 让我试试手感 xff0c 好决定买哪个 那么我现在就有3个手柄可以使用 我就想找个3个人以上的游戏来玩玩 首先我就想到了玩街机
  • [视频教程]macOS运行MAME

    操作视频 https www bilibili com video BV1Nr4y1D7SQ 安装 brew span class token function install span mame ROM 把roms文件夹放到以下目录 xf
  • [视频教程]MAME画质优化hq3x

    关于滤镜 xff0c 萝卜白菜各有所爱 我个人喜欢hq3x的画质 视频教程 https www bilibili com video BV1Ji4y1d7j6 默认画质 hq3x 加了层滤镜 xff0c 显示更平滑了 配置方法 核心配置 搜
  • macOS安装最新MAME 报错dyld: Library not loaded:

    在macOS中安装最新版本的MAME 报错 mame dyld Library not loaded 64 rpath SDL2 framework Versions A SDL2 Referenced from Users itkey m
  • ‘@typescript-eslint/no-shadow‘ was not found

    我新建了一个React Native项目 xff0c 然后IDE报错如下 以前新建的项目是不会有任何报错信息的 报错信息 Definition for rule 64 typescript eslint no shadow was not
  • 纯javascript代码修改react 的输入框的值

    背景 我们有一个老的项目在维护 xff0c 项目是使用react dom 64 16 8 6实现的 也就是react开发的 xff0c 但是因为某种原因 xff0c 暂时找不到源码了 时间紧任务重 xff0c 必须赶紧解决问题 简化需求 x
  • 【视频教程】MAME0.238配置分享

    视频链接 https www bilibili com video BV15q4y1B7Hn 附件下载 百度网盘分享 链接 https pan baidu com s 1rsBdRn99 KWjhRpPrGBmfw 提取码 4ge5CSDN
  • Netty解决粘包拆包问题,Netty使用自定义编解码器解决粘包拆包问题

    文章目录 一 什么是粘包拆包二 粘包拆包实例1 Server端2 Client端3 测试一下 三 解决粘包拆包的方案四 使用自定义编解码器解决粘包拆包问题1 定义协议包2 编解码器3 server端4 client端5 测试一下 一 什么是
  • vim表格格式化插件vim-table-mode

    简介 一个很棒的自动表格创建器和格式化程序 xff0c 允许您在键入时创建整洁的表格 项目主页 https github com dhruvasagar vim table mode 安装 这里以packer为例 packer插件管理器安装
  • React Native打包安卓应用笔记

    前言 大部分内容是复制过来的 xff0c 主要做了一下整理 xff0c 方便自己查阅 打包发布 Android 要求所有应用都有一个数字签名才会被允许安装在用户手机上 xff0c 所以在把应用发布到应用市场之前 xff0c 你需要先生成一个
  • 无需AS通过命令行启动安卓模拟器

    背景 我是做RN开发的 xff0c 要经常启动安卓模拟器来测试 时间久了 xff0c Android Studio 成安卓模拟器启动器了 每次打开AS还是挺久的 xff0c 所以就想直接通过命令行来启动安卓模拟器 xff0c 这样会快很多
  • ssh登录时自动运行命令

    方法一 编辑 nvim ssh rc 里面写上要执行的命令 比如 xff1a span class token builtin class name echo span span class token string 39 command
  • React Native倒计时

    代码 span class token keyword import span span class token operator span span class token keyword as span React span class
  • React Native android 8以上版本闪退踩坑笔记

    背景 我的android的测试机版本是android 8 xff0c 我使用React Native开发的应用 xff0c 自己测试是正常的 iOS上也是正常的 但是一放到android8以上的版本本就出现了闪退的现象 xff0c 让我非常
  • git push send-pack: unexpected disconnect while reading sideband packet

    错误信息 span class token function git span push 枚举对象中 span class token number 200 span 完成 对象计数中 span class token number 100
  • node.js启动静态页面服务

    安装http server span class token function npm span span class token function install span http server g 启动服务 span class to
  • ssh中远程vim自动切本地输入法

    简介 SshIM 是一个解决ssh中使用vim nvim 中文输入法自动切换插件 原理 xff1a 当ssh中使用的vim 进入insert模式或者离开insert模式 xff0c 都会触发一个事件 当触发事件以后 xff0c 通过http
  • mame0.239选定系统所需要的ROM/磁碟映像档为缺少或不正确

    背景 之前发过一个视频 xff0c 讲MAME怎么配置和使用的 结果有网友反馈说有少部分游戏打开报错 xff0c 不会弄 运气比较好 xff0c 我想玩的ROM基本都可以正常使用 我就很奇怪 xff0c 为什么会报错的呢 xff1f 我怎么
  • Netty核心源码分析(一),Netty的Server端启动过程源码分析

    文章目录 系列文章目录一 Netty的Server端启动过程源码分析1 NioEventLoopGroup的创建 xff08 1 xff09 构造方法 2 ServerBootstrap的创建 xff08 1 xff09 构造方法 xff0