• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty源碼------Pipeline詳細分析

    標簽: Netty  java  netty

    Netty源碼------Pipeline詳細分析

    目錄

    Netty源碼------Pipeline詳細分析

    1、Channel 與ChannelPipeline

    2、再探ChannelPipeline 的初始化:

    3、ChannelInitializer 的添加

    4、自定義ChannelHandler 的添加過程

    5、ChannelHandler 默認命名規則

    6、Pipeline 的事件傳播機制

    6.1 Outbound 事件傳播方式

    6.2 Inbound 事件傳播方式

    6.3 Pipeline 事件傳播小結

    7、Handler 的各種姿勢

    8、Channel 的生命周期


    1、Channel 與ChannelPipeline

      相信大家都已經知道,在Netty 中每個Channel 都有且僅有一個ChannelPipeline 與之對應,它們的組成關系如下:

      通過上圖我們可以看到, 一個Channel 包含了一個ChannelPipeline , 而ChannelPipeline 中又維護了一個由ChannelHandlerContext 組成的雙向鏈表。這個鏈表的頭是HeadContext,鏈表的尾是TailContext,并且每個ChannelHandlerContext 中又關聯著一個ChannelHandler。圖示給了我們一個對ChannelPipeline 的直觀認識,但是實際上Netty 實現的Channel 是否真的是這樣的呢?我們繼續用源碼說話。在前我們已經知道了一個Channel 的初始化的基本過程,下面我們再回顧一下。下面的代碼是AbstractChannel 構造器:

    protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }

      AbstractChannel 有一個pipeline 字段,在構造器中會初始化它為DefaultChannelPipeline 的實例。這里的代碼就印證了一點:每個Channel 都有一個ChannelPipeline。接著我們跟蹤一下DefaultChannelPipeline 的初始化過程,首先進入到DefaultChannelPipeline 構造器中:

    protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
            tail = new TailContext(this);
            head = new HeadContext(this);
            head.next = tail;
            tail.prev = head;
        }

      在DefaultChannelPipeline 構造器中, 首先將與之關聯的Channel 保存到字段channel 中。然后實例化兩個ChannelHandlerContext:一個是HeadContext 實例head,另一個是TailContext 實例tail。接著將head 和tail 互相指向, 構成一個雙向鏈表。

      特別注意的是:我們在開始的示意圖中head 和tail 并沒有包含ChannelHandler,這是因為HeadContext 和TailContext繼承于AbstractChannelHandlerContext 的同時也實現了ChannelHandler 接口了,因此它們有Context 和Handler的雙重屬性。

    2、再探ChannelPipeline 的初始化:

      前面的學習我們已經對ChannelPipeline 的初始化有了一個大致的了解,不過當時重點沒有關注ChannelPipeline,因此沒有深入地分析它的初始化過程。那么下面我們就來看一下具體的ChannelPipeline 的初始化都做了哪些工作吧。先回顧一下,在實例化一個Channel 時,會伴隨著一個ChannelPipeline 的實例化,并且此Channel 會與這個ChannelPipeline相互關聯,這一點可以通過NioSocketChannel 的父類AbstractChannel 的構造器予以佐證:

    protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }

      當實例化一個NioSocketChannel 是,其pipeline 字段就是我們新創建的DefaultChannelPipeline 對象。可以看到,在DefaultChannelPipeline 的構造方法中,將傳入的channel 賦值給字段this.channel,接著又實例化了兩個特殊的字段:tail 與head,這兩個字段是一個雙向鏈表的頭和尾。其實在DefaultChannelPipeline 中,維護了一個以AbstractChannelHandlerContext 為節點的雙向鏈表,這個鏈表是Netty 實現Pipeline 機制的關鍵。再回顧一下head和tail 的類層次結構:

      從類層次結構圖中可以很清楚地看到,head 實現了ChannelInboundHandler與ChannelOutboundHandler,而tail 實現了ChannelOutboundHandler 接口,并且它們都實現了ChannelHandlerContext 接口, 因此可以說head 和tail 即是一個ChannelHandler,又是一個ChannelHandlerContext。接著看HeadContext與TailContext 構造器中的代碼:

    HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
                this.unsafe = pipeline.channel().unsafe();
                this.setAddComplete();
    }
    TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);
                this.setAddComplete();
    }

      我們可以看到,鏈表中head 是一個ChannelOutboundHandler,而tail 則是一個ChannelInboundHandler。它調用了父類AbstractChannelHandlerContext 的構造器,并傳入參數inbound = false,outbound = true。而TailContext 的構造器與HeadContext 的相反,它調用了父類AbstractChannelHandlerContext 的構造器,并傳入參數inbound = true,outbound = false。即header 是一個OutBoundHandler,而tail 是一個InBoundHandler。

    3、ChannelInitializer 的添加

      前面我們已經分析過Channel 的組成,其中我們了解到,最開始的時候ChannelPipeline 中含有兩個ChannelHandlerContext(同時也是ChannelHandler),但是這個Pipeline 并不能實現什么特殊的功能,因為我們還沒有給它添加自定義的ChannelHandler。通常來說,我們在初始化Bootstrap,會添加我們自定義的ChannelHandler,就以我們具體的客戶端啟動代碼片段來舉例:

     

    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChatClientHandler(nickName));
      }
    });

     

      上面代碼的初始化過程,相信大家都不陌生。在調用handler 時,傳入了ChannelInitializer 對象,它提供了一個initChannel()方法給我我們初始化ChannelHandler。最后將這個匿名的Handler保存到AbstractBootstrap中。那么這個初始化過程是怎樣的呢?下面我們來揭開它的神秘面紗。

      ChannelInitializer 實現了ChannelHandler,那么它是在什么時候添加到ChannelPipeline 中的呢?通過代碼跟蹤,我們發現它是在Bootstrap 的init()方法中添加到ChannelPipeline 中的,其代碼如下(以客戶端為例):

     

    void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            p.addLast(new ChannelHandler[]{this.config.handler()});
         。。。。。。
    }
    //AbstractBootstrapConfig
    public final ChannelHandler handler() {
      return this.bootstrap.handler();
    }
    //AbstractBootstrap
    final ChannelHandler handler() {
            return this.handler;
    }

      從上面的代碼可見,將handler()返回的ChannelHandler 添加到Pipeline 中,而handler()返回的其實就是我們在初始化Bootstrap 時通過handler()方法設置的ChannelInitializer 實例,因此這里就是將ChannelInitializer 插入到了Pipeline的末端。此時Pipeline 的結構如下圖所示:

      這時候,有小伙伴可能就有疑惑了,我明明插入的是一個ChannelInitializer 實例,為什么在ChannelPipeline 中的雙向鏈表中的元素卻是一個ChannelHandlerContext 呢?我們繼續去源碼中尋找答案。

      剛才,我們提到,在Bootstrap 的init()方法中會調用p.addLast()方法,將ChannelInitializer 插入到鏈表的末端:

     

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized(this) {
                checkMultiplicity(handler);
                newCtx = this.newContext(group, this.filterName(name, handler), handler);
           this.addLast0(newCtx);
     }
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
    }

     

      addLast()有很多重載的方法,我們只需關注這個比較重要的方法就行。上面的addLast()方法中,首先檢查ChannelHandler 的名字是否是重復,如果不重復,則調用newContex()方法為這個Handler 創建一個對應的DefaultChannelHandlerContext 實例,并與之關聯起來(Context 中有一個handler 屬性保存著對應的Handler 實例)。為了添加一個handler 到pipeline 中,必須把此handler 包裝成ChannelHandlerContext。因此在上面的代碼中我們可以看到新實例化了一個newCtx 對象,并將handler 作為參數傳遞到構造方法中。那么我們來看一下實例化的DefaultChannelHandlerContext 到底有什么玄機吧。首先看它的構造器:

     

    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
            if (handler == null) {
                throw new NullPointerException("handler");
            } else {
                this.handler = handler;
            }
        }

     

      在DefaultChannelHandlerContext 的構造器中,調用了兩個很有意思的方法:isInbound()與isOutbound(),這兩個方法是做什么的呢?從源碼中可以看到,當一個handler 實現了ChannelInboundHandler 接口,則isInbound 返回true;類似地,當一個handler 實現了ChannelOutboundHandler 接口,則isOutbound 就返回true。而這兩個boolean 變量會傳遞到父類AbstractChannelHandlerContext 中,并初始化父類的兩個字段:inbound 與outbound。那么這里的ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound 與outbound 字段分別是什么呢? 那就看一下ChannelInitializer 到底實現了哪個接口不就行了?如下是ChannelInitializer 的類層次結構圖:

      從類圖中可以清楚地看到,ChannelInitializer 僅僅實現了ChannelInboundHandler 接口,因此這里實例化的DefaultChannelHandlerContext 的inbound = true,outbound = false。兜了一圈,不就是inbound 和outbound 兩個字段嘛,為什么需要這么大費周折地分析一番?其實這兩個字段關系到pipeline 的事件的流向與分類,因此是十分關鍵的,不過我在這里先賣個關子, 后面我們再來詳細分析這兩個字段所起的作用。至此, 我們暫時先記住一個結論:ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound =true,outbound = false。當創建好Context 之后,就將這個Context 插入到Pipeline 的雙向鏈表中

     

    private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = this.tail.prev;
            newCtx.prev = prev;
            newCtx.next = this.tail;
            prev.next = newCtx;
            this.tail.prev = newCtx;
        }

       添加完ChannelInitializer的Pipeline現在是長這樣的:

     4、自定義ChannelHandler 的添加過程

      前面我們已經分析了ChannelInitializer 是如何插入到Pipeline 中的,接下來就來探討ChannelInitializer 在哪里被調用,ChannelInitializer 的作用以及我們自定義的ChannelHandler 是如何插入到Pipeline 中的。先簡單復習一下Channel 的注冊過程:

    1. 首先在AbstractBootstrap 的initAndRegister()中,通過group().register(channel),調用MultithreadEventLoopGroup 的register()方法。
    2. 在MultithreadEventLoopGroup 的register()中調用next()獲取一個可用的SingleThreadEventLoop,然后調用它的register()方法。
    3. 在SingleThreadEventLoop 的register()方法中,通過channel.unsafe().register(this, promise)方法獲取channel的unsafe()底層IO 操作對象,然后調用它的register()。
    4. 在AbstractUnsafe 的register()方法中,調用register0()方法注冊Channel 對象。
    5. 在AbstractUnsafe 的register0()方法中,調用AbstractNioChannel 的doRegister()方法。
    6. AbstractNioChannel 的doRegister()方法調用javaChannel().register(eventLoop().selector, 0, this)將Channel對應的Java NIO 的SockerChannel 對象注冊到一個eventLoop 的Selector 中,并且將當前Channel 作為attachment。

      而我們自定義ChannelHandler 的添加過程,發生在AbstractUnsafe 的register0()方法中,在這個方法中調用了pipeline.fireChannelRegistered()方法,其代碼實現如下:

     

    private void register0(ChannelPromise promise) {
            boolean firstRegistration = this.neverRegistered;
            AbstractChannel.this.doRegister();
            this.neverRegistered = false;
            AbstractChannel.this.registered = true;
            AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
            this.safeSetSuccess(promise);
            AbstractChannel.this.pipeline.fireChannelRegistered();
    }
    public final ChannelPipeline fireChannelRegistered() {
            AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
            return this;
    }

     

    再看AbstractChannelHandlerContext 的invokeChannelRegistered()方法:

     

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRegistered();
            } else {
                executor.execute(new Runnable() {
                    public void run() {
                        next.invokeChannelRegistered();
                    }
                });
            }
        }

     

      很顯然,這個代碼會從head 開始遍歷Pipeline 的雙向鏈表,然后 findContextInbound()  找到第一個屬性inbound 為true 的ChannelHandlerContext 實例。看代碼:

     

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
    }
    public ChannelHandlerContext fireChannelRegistered() {
            invokeChannelRegistered(this.findContextInbound());
            return this;
    }

     

      想起來了沒?我們在前面分析ChannelInitializer 時,花了大量的篇幅來分析了inbound和outbound 屬性,現在這里就用上了。回想一下,ChannelInitializer 實現了ChannelInboudHandler,因此它所對應的ChannelHandlerContext 的inbound 屬性就是true,因此這里返回就是ChannelInitializer 實例所對應的ChannelHandlerContext 對象,如下圖所示:

      當獲取到inbound 的Context 后,就調用它的invokeChannelRegistered()方法:

     

    private void invokeChannelRegistered() {
            if (this.invokeHandler()) {
                try {
                    ((ChannelInboundHandler)this.handler()).channelRegistered(this);
                } catch (Throwable var2) {
                    this.notifyHandlerException(var2);
                }
            } else {
                this.fireChannelRegistered();
            }
    }

     

      我們已經知道,每個ChannelHandler 都和一個ChannelHandlerContext 關聯,我們可以通過ChannelHandlerContext獲取到對應的ChannelHandler。因此很明顯,這里handler()返回的對象其實就是一開始我們實例化的ChannelInitializer 對象,并接著調用了ChannelInitializer 的channelRegistered()方法。看到這里, 應該會覺得有點眼熟了。ChannelInitializer 的channelRegistered()這個方法我們在一開始的時候已經接觸到了,但是我們并沒有深入地分析這個方法的調用過程。下面我們來看這個方法中到底有什么玄機,繼續看代碼:

     

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
      protected abstract void initChannel(C ch) throws Exception;
        @Override
        @SuppressWarnings("unchecked")
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            if (initChannel(ctx)) {
                ctx.pipeline().fireChannelRegistered();
                removeState(ctx);
            } else {
                ctx.fireChannelRegistered();
            }
        }private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.add(ctx)) { // Guard against re-entrance.
                try {
                    initChannel((C) ctx.channel());
                } catch (Throwable cause) {
                    exceptionCaught(ctx, cause);
                } finally {
                    ChannelPipeline pipeline = ctx.pipeline();
                    if (pipeline.context(this) != null) {
                        pipeline.remove(this);
                    }
                }
                return true;
            }
            return false;
        }
    }

     

      initChannel((C) ctx.channel())這個方法我們也很熟悉,它就是我們在初始化Bootstrap 時,調用handler 方法傳入的匿名內部類所實現的方法:

    protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline pipeline = ch.pipeline();
          pipeline.addLast("handler", new MyClient());
    }

      因此,當調用這個方法之后, 我們自定義的ChannelHandler 就插入到了Pipeline,此時Pipeline 的狀態如下圖所示:

     

      當添加完成自定義的ChannelHandler 后,在finally 代碼塊會刪除自定義的ChannelInitializer,也就是remove(ctx)最終調用ctx.pipeline().remove(this),因此最后的Pipeline 的狀態如下:(這里圖有誤!

      至此,自定義ChannelHandler 的添加過程也分析得差不多了。

    5、ChannelHandler 默認命名規則

      不知道大家注意到沒有,pipeline.addXXX 都有一個重載的方法,例如addLast()它有一個重載的版本是:ChannelPipeline addLast(String name, ChannelHandler handler);第一個參數指定添加的handler 的名字(更準確地說是ChannelHandlerContext 的名字,說成handler 的名字更便于理解)。那么handler 的名字有什么用呢?如果我們不設置name,那么handler 默認的名字是怎樣呢?帶著這些疑問,我們依舊還是去源碼中找到答案。還是以addLast()方法為例:

    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
            return addLast(null, name, handler);
        }

      這個方法會調用重載的addLast()方法:

     

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);return this;
        }

     

      第一個參數被設置為null,我們不用關心它。第二參數就是這個handler 的名字。看代碼可知,在添加一個handler之前,需要調用checkMultiplicity()方法來確定新添加的handler 名字是否與已添加的handler 名字重復。

      如果我們調用的是如下的addLast()方法:ChannelPipeline addLast(ChannelHandler... handlers);那么Netty 就會調用generateName()方法為新添加的handler 自動生成一個默認的名字:

     

    private String filterName(String name, ChannelHandler handler) {
            if (name == null) {
                return generateName(handler);
            }
            checkDuplicateName(name);
            return name;
        }
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

      而generateName()方法會接著調用generateName0()方法來實際生成一個新的handler 名字

    private static String generateName0(Class<?> handlerType) {
            return StringUtil.simpleClassName(handlerType) + "#0";
    }

      默認命名的規則很簡單,就是用反射獲取handler 的simpleName 加上"#0",因此我們自定義ChatClientHandler 的名字就是"ChatClientHandler#0"。

    6、Pipeline 的事件傳播機制

      前面章節中,我們已經知道AbstractChannelHandlerContext 中有inbound 和outbound 兩個boolean 變量,分別用于標識Context 所對應的handler 的類型,即:

    1. inbound 為true 是,表示其對應的ChannelHandler 是ChannelInboundHandler 的子類。
    2. outbound 為true 時,表示對應的ChannelHandler 是ChannelOutboundHandler 的子類。

      這里大家肯定還有很多疑惑,不知道這兩個字段到底有什么作用? 這還要從ChannelPipeline 的事件傳播類型說起。Netty 中的傳播事件可以分為兩種:Inbound 事件和Outbound 事件。如下是從Netty 官網針對這兩個事件的說明:

      從上圖可以看出,inbound 事件和outbound 事件的流向是不一樣的,inbound 事件的流行是從下至上,而outbound剛好相反,是從上到下。并且inbound 的傳遞方式是通過調用相應的ChannelHandlerContext.fireIN_EVT()方法,而outbound 方法的的傳遞方式是通過調用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext的fireChannelRegistered()調用會發送一個ChannelRegistered 的inbound 給下一個ChannelHandlerContext,而ChannelHandlerContext 的bind()方法調用時會發送一個bind 的outbound 事件給下一個ChannelHandlerContext。

        Inbound 事件傳播方法有:

     

    public interface ChannelInboundHandler extends ChannelHandler {
        void channelRegistered(ChannelHandlerContext var1) throws Exception;
    
        void channelUnregistered(ChannelHandlerContext var1) throws Exception;
    
        void channelActive(ChannelHandlerContext var1) throws Exception;
    
        void channelInactive(ChannelHandlerContext var1) throws Exception;
    
        void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
    
        void channelReadComplete(ChannelHandlerContext var1) throws Exception;
    
        void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
    
        void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
    
        void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
    }

     

      Outbound 事件傳播方法有:

     

    public interface ChannelOutboundHandler extends ChannelHandler {
        void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
    
        void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
    
        void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    
        void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    
        void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
    
        void read(ChannelHandlerContext var1) throws Exception;
    
        void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
    
        void flush(ChannelHandlerContext var1) throws Exception;
    }

      大家應該發現了規律:inbound 類似于是事件回調(響應請求的事件),而outbound 類似于主動觸發(發起請求的事件)。注意,如果我們捕獲了一個事件,并且想讓這個事件繼續傳遞下去,那么需要調用Context 對應的傳播方法 fireXXX,例如:

     

    public class MyInboundHandler extends ChannelInboundHandlerAdapter {
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連接成功");
        ctx.fireChannelActive();
      }
    }

    6.1 Outbound 事件傳播方式

      Outbound 事件都是請求事件(request event),即請求某件事情的發生,然后通過Outbound 事件進行通知。Outbound 事件的傳播方向是tail -> customContext -> head。我們接下來以connect 事件為例,分析一下Outbound 事件的傳播機制。首先,當用戶調用了Bootstrap 的connect()方法時,就會觸發一個Connect 請求事件,我們就發現AbstractChannel 的connect()其實由調用了DefaultChannelPipeline 的connect()方法:

    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }

      而pipeline.connect()方法的實現如下:

    public final ChannelFuture connect(SocketAddress remoteAddress) {
            return tail.connect(remoteAddress);
        }

      可以看到,當outbound 事件(這里是connect 事件)傳遞到Pipeline 后,它其實是以tail 為起點開始傳播的。而tail.connect()其實調用的是AbstractChannelHandlerContext 的connect()方法

     

    public ChannelFuture connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeConnect(remoteAddress, localAddress, promise);return promise;
        }

     

      findContextOutbound()方法顧名思義,它的作用是以當前Context 為起點,向Pipeline 中的Context 雙向鏈表的前端尋找第一個outbound 屬性為true 的Context(即關聯ChannelOutboundHandler 的Context),然后返回。findContextOutbound()方法代碼實現如下:

     

    private AbstractChannelHandlerContext findContextOutbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while ((ctx.executionMask & mask) == 0);
            return ctx;
        }

     

      當我們找到了一個outbound 的Context 后,就調用它的invokeConnect()方法,這個方法中會調用Context 其關聯的ChannelHandler 的connect()方法

     

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            if (invokeHandler()) {
                try {
                    ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
                } catch (Throwable t) {
                    notifyOutboundHandlerException(t, promise);
                }
            } else {
                connect(remoteAddress, localAddress, promise);
            }
        }

     

      如果用戶沒有重寫ChannelHandler 的connect()方法,那么會調用ChannelOutboundHandlerAdapter 的connect()實現

    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
                SocketAddress localAddress, ChannelPromise promise) throws Exception {
            ctx.connect(remoteAddress, localAddress, promise);
        }

      我們看到,ChannelOutboundHandlerAdapter 的connect()僅僅調用了ctx.connect(),而這個調用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect這樣的循環中,直到connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節點,即head 中。為什么會傳遞到head 中呢?回想一下,head 實現了ChannelOutboundHandler,因此它的outbound 屬性是true。因為head 本身既是一個ChannelHandlerContext,又實現了ChannelOutboundHandler 接口,因此當connect()消息傳遞到head 后,會將消息轉遞到對應的ChannelHandler 中處理,而head 的handler()方法返回的就是head 本身:

    public ChannelHandler handler() {
          return this;
    }

      因此最終connect()事件是在head 中被處理。head 的connect()事件處理邏輯如下:

    public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) {
                unsafe.connect(remoteAddress, localAddress, promise);
            }

      到這里, 整個connect()請求事件就結束了。下圖中描述了整個connect()請求事件的處理過程:

      我們僅僅以connect()請求事件為例,分析了outbound 事件的傳播過程,但是其實所有的outbound 的事件傳播都遵循著一樣的傳播規律,小伙伴們可以試著分析一下其他的outbound 事件,體會一下它們的傳播過程。

    6.2 Inbound 事件傳播方式

      Inbound 事件和Outbound 事件的處理過程是類似的,只是傳播方向不同。Inbound 事件是一個通知事件,即某件事已經發生了,然后通過Inbound 事件進行通知。Inbound 通常發生在Channel的狀態的改變或IO 事件就緒。Inbound 的特點是它傳播方向是head -> customContext -> tail。上面我們分析了connect()這個Outbound 事件,那么接著分析connect()事件后會發生什么Inbound 事件,并最終找到Outbound 和Inbound 事件之間的聯系。當connect()這個Outbound 傳播到unsafe 后,其實是在AbstractNioUnsafe的connect()方法中進行處理的:

     

    public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
           boolean wasActive = isActive();
           if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);
           } else {
           }
    }

     

      在AbstractNioUnsafe 的connect()方法中,首先調用doConnect()方法進行實際上的Socket 連接,當連接上后會調用fulfillConnectPromise()方法

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
                if (!wasActive && active) {
                    pipeline().fireChannelActive();
                }
            }

      我們看到,在fulfillConnectPromise()中,會通過調用pipeline().fireChannelActive()方法將通道**的消息(即Socket 連接成功)發送出去。而這里,當調用pipeline.fireXXX 后,就是Inbound 事件的起點。因此當調用pipeline().fireChannelActive()后,就產生了一個ChannelActive Inbound 事件,我們就從這里開始看看這個Inbound事件是怎么傳播的?

    public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }

      果然, 在fireChannelActive()方法中,調用的是head.invokeChannelActive(),因此可以證明Inbound 事件在Pipeline中傳輸的起點是head。那么,在head.invokeChannelActive()中又做了什么呢?

     

    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }

     

      接下去的調用流程是:

    private void invokeChannelActive() {
            if (this.invokeHandler()) {
                try {
                    ((ChannelInboundHandler)this.handler()).channelActive(this);
                } catch (Throwable var2) {
                    this.notifyHandlerException(var2);
                }
            } else {
                this.fireChannelActive();
            }
    
    }
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();
                this.readIfIsAutoRead();
            }
    public ChannelHandlerContext fireChannelActive() {
            AbstractChannelHandlerContext next = this.findContextInbound();
            invokeChannelActive(next);
            return this;
        }

     

      上面的代碼應該很熟悉了。回想一下在Outbound 事件(例如connect()事件)的傳輸過程中時,我們也有類似的操作:

    1. 首先調用findContextInbound(),從Pipeline 的雙向鏈表中中找到第一個屬性inbound 為true 的Context,然后將其返回。
    2. 調用Context 的invokeChannelActive()方法.

      invokeChannelActive()方法源碼如下:

     

    private void invokeChannelActive() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelActive(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelActive();
            }
        }

     

      這個方法和Outbound 的對應方法(如:invokeConnect()方法)如出一轍。與Outbound 一樣,如果用戶沒有重寫channelActive() 方法,那就會調用ChannelInboundHandlerAdapter 的channelActive()方法:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }

      同樣地, 在ChannelInboundHandlerAdapter 的channelActive()中,僅僅調用了ctx.fireChannelActive()方法,因此就會進入Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()這樣的循環中。同理,tail 本身既實現了ChannelInboundHandler 接口,又實現了ChannelHandlerContext 接口,因此當channelActive()消息傳遞到tail 后,會將消息轉遞到對應的ChannelHandler 中處理,而tail 的handler()返回的就是tail 本身:

    public ChannelHandler handler() {
                return this;
            }

      因此channelActive Inbound 事件最終是在tail 中處理的,我們看一下它的處理方法:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
            }

      TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 處理方法時就會發現,它們的實現都是空的。可見,如果是Inbound,當用戶沒有實現自定義的處理器時,那么默認是不處理的。下圖描述了Inbound事件的傳輸過程:

    6.3 Pipeline 事件傳播小結

      Outbound 事件總結:

    • Outbound 事件是請求事件(由connect()發起一個請求,并最終由unsafe 處理這個請求)。
    • Outbound 事件的發起者是Channel。
    • Outbound 事件的處理者是unsafe。
    • Outbound 事件在Pipeline 中的傳輸方向是tail -> head。
    • 在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx 的方法(如:ctx.connect()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。
    • Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()

      

    Inbound 事件總結:

    • Inbound 事件是通知事件,當某件事情已經就緒后,通知上層。
    • Inbound 事件發起者是unsafe。
    • Inbound 事件的處理者是Channel,如果用戶沒有實現自定義的處理方法,那么Inbound 事件默認的處理者是TailContext,并且其處理方法是空實現。Inbound 事件在Pipeline 中傳輸方向是head -> tail。
    • 在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx.fireIN_EVT()事件(如:ctx.fireChannelActive()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。
    • Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().

      outbound 和inbound 事件設計上十分相似,并且Context 與Handler 直接的調用關系也容易混淆,因此我們在閱讀這里的源碼時,需要特別的注意。

    7、Handler 的各種姿勢

      ChannelHandlerContext

      每個ChannelHandler 被添加到ChannelPipeline 后,都會創建一個ChannelHandlerContext 并與之創建的ChannelHandler 關聯綁定。ChannelHandlerContext 允許ChannelHandler 與其他的ChannelHandler 實現進行交互。ChannelHandlerContext 不會改變添加到其中的ChannelHandler,因此它是安全的。下圖描述了ChannelHandlerContext、ChannelHandler、ChannelPipeline 的關系:

     

    8、Channel 的生命周期:

      Netty 有一個簡單但強大的狀態模型,并完美映射到ChannelInboundHandler 的各個方法。下面是Channel 生命周期中四個不同的狀態:

    1. channelUnregistered() Channel已創建,還未注冊到一個EventLoop上
    2. channelRegistered() Channel已經注冊到一個EventLoop上
    3. channelActive() Channel是活躍狀態(連接到某個遠端),可以收發數據
    4. channelInactive() Channel未連接到遠端

      一個Channel 正常的生命周期如下圖所示。隨著狀態發生變化相應的事件產生。這些事件被轉發到ChannelPipeline中的ChannelHandler 來觸發相應的操作。

     

    版權聲明:本文為qqq3117004957原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/qqq3117004957/article/details/106470684

    智能推薦

    MapReduce詳細分析

    一、MapReduce概述 1、定義 MapReduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個Hadoop集群 上。 2、MR進程 一個完整的MapR educe程序在分布式運行時有三類實例進程: **Mr AppMaster:**負責整個程序的過程調度及狀態協調。 MapTask:負責Map階段的整個數據處理流程。 ReduceTask:負...

    HBase 0.92.1 Scan 源碼詳細分析

    從這篇文章開始終于要討論比較正常版本的hbase了---0.92.1~~ Scan是hbase提供的非常重要的功能之一,我們的hbase分析系列就從這兒開始吧。 首先,我們有一些background的知識需要了解: 1.HBase是如何區分不同記錄的,大家可以參考http://punishzhou.iteye.com/blog/1266341,講的比較詳細 2.Region,MemStore,St...

    [詳細分析]Java-ArrayList源碼全解析

    目錄 類圖 屬性 構造方法 帶初始容量的構造方法 無參構造方法 帶一個集合參數的構造方法 插入方法 在列表最后添加指定元素 在指定位置添加指定元素 插入方法調用的其他私有方法 擴容方法 移除方法 移除指定下標元素方法 移除指定元素方法 私有移除方法 查找方法 查找指定元素的所在位置 查找指定位置的元素 序列化方法 反序列化方法 創建子數組 迭代器 創建迭代器方法 Itr屬性 Itr的hasNext...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    猜你喜歡

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    精品国产乱码久久久久久蜜桃不卡