• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty拾遺(五)——Netty中的channel,Handler和pipeline

    標簽: # Netty  netty

    前言

    之前總結過NIO的三個組件,其實對于Netty來說,如果要理解后續的編解碼器已經實際的Netty應用場景,還有幾個Netty的組件需要了解。這篇博客依舊參考《Netty、Redis、Zookeeper高并發實戰》一書進行總結。

    channel

    Netty中,channel是核心概念,表示一個網絡連接通道,類似于網絡的socket,Netty關于channel有一個頂層沖虛向類——AbstractChannel,其中有一個構造函數如下:

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;//父通道
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();//每一個通道,其實都維護一個類似于流水線的東西,這個后面會總結
    }
    

    這里有兩個屬性需要說明一下,一個是parent,另一個是pipeline

    parent表示通道的父通道,對于連接建立的通道(比如NioServerScoketChannel)來說,其父通道為null。而對于每一條傳輸數據的通道(比如NioSocketChannel),其父通道就是建立連接的通道。

    如果簡單一點理解,channel就是連接網絡的通道。

    EmbeddedChannel

    這里說一下EmbeddedChannel,我們在開發Netty的時候,大部分時候都是在開發Inbound入站處理器,開發完成之后,如果需要測試相關邏輯,我們需要將處理器加入到通道流水線中,然后啟動服務端和客戶端。每次修改一個業務,都要啟動服務端和客戶端,這個過程非常繁瑣并且浪費時間。于是Netty就給我們提供了一個專用的通道——EmbeddedChannel。

    Handler

    在經典的反應堆模式中,我們知道每一個注冊在Selector上的網絡事件都會被分發給指定的handler去處理,在Netty中也是一樣,Netty中的Handler就是處理不同的網絡事件。針對入站和出站,有著不同的處理器。

    入站處理——數據流動方向為Netty的內部channel到ChannelInboundHandler。

    出站處理——數據流動方向為ChannelOutboundHandler到Netty的channel。

    但是實際過程中,Netty給我們提供了很多默認的handler實現,一般不需要我們直接去繼承ChannelHandler,畢竟ChannelHandler是一個接口,如果我們每次新增一個Handler,都需要實現ChannelHandler中的每個方法,這樣很麻煩,于是Netty就提供了一個適配器,提供了ChannelHandler的默認實現,更多的時候,我們可以直接繼承ChannelHandlerAdapter即可。具體的類結構如下所示:

    在這里插入圖片描述

    方法名稱作用
    channelRegistered通道注冊完成之后,Netty底層會調用fireChannelRegistered,觸發通道注冊事件,通道就會啟動該入站操作的流水線處理。在通道注冊過的Handler處理器中的channelRegistered方法就會被調用。
    channelActive通道**的時候,在通道注冊過的Handler中的channelActive方法就會被調用
    channelRead通道可讀的時候,在通道注冊過的Handler中的channelRead方法就會被調用
    channelReadComplete通道讀取結束的時候,在通道注冊過的Handler中的channelReadComplete就會被調用
    channelInactive通道連接不可用的時候,在通道注冊的Handler中的channelInactive就會被調用
    exceptionCaught通道處理過程發生異常的時候,在通道注冊的Handler中的exceptionCaught就會被調用。

    ChannelInitializer通道初始化處理器

    其實每一條channel都有一條Handler業務處理流水線,這就是我們后續要總結的pipeline ,但是負責裝配自己的Handler處理器這個工作是發生在通道開始工作之前。通道的初始化類,就可以向流水線中裝配業務處理類。

    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(requestLength));
            socketChannel.pipeline().addLast(new FixLengthTimeServerHandler());
        }
    });
    

    上述的代碼中就在SocketChannel初始化的時候,增加了兩個處理器。

    pipeline

    Netty的業務處理器流水線ChannelPipeline是基于責任鏈設計模式來設計的。針對每一個channel,都會有一個業務流水線與其綁定,這個流水線中包含了所有的入站和出站處理器,這個業務流水線就是pipeline,pipeline,channel,ChannelHandler三者的關系如下所示。

    在這里插入圖片描述

    每個處理器被加入到pipeline的時候,都會有一個ChannelHandlerContext與之綁定。

    總體來說,Channel、Handler、pipeline、ChannelHandlerContext幾者的關系為,Channel通道擁有一個ChannelPipeline流水線,每個流水線節點為一個ChannelHandlerContext通道處理器上下對象,每一個上下文中包含了一個ChannelHandler處理器。在ChannelHandler通道處理器的入站/出站處理方法中,Netty會傳遞ChannelHandlerContext,通過ChannelHandlerContext可以獲取pipeline的實例或者Channel的實例。

    流水線的傳遞與截斷

    Inbound的事件處理順序

    在每次初始化增加channel處理器的時候,這些處理器會自動成為pipeline中的節點,這里直接上實例代碼

    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:simpleInHandlerA
     */
    @Slf4j
    public class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("this is A simpleInHandler read method,message:{}",msg);
            super.channelRead(ctx, msg);
        }
    }
    
    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:simpleInHandlerB
     */
    @Slf4j
    public class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("this is B simpleInHandler read method message:{}",msg);
            super.channelRead(ctx, msg);
        }
    }
    
    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:simpleInHandlerC
     */
    @Slf4j
    public class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("this is C simpleInHandler read method message:{}",msg);
            super.channelRead(ctx, msg);
        }
    }
    

    使用EmbeddedChannel進行測試

    /**
     * autor:liman
     * createtime:2020/9/19
     * comment: InboundHandler的處理器測試
     */
    @Slf4j
    public class TestPipelineInbound {
    
        public static void main(String[] args) {
            ChannelInitializer<EmbeddedChannel> channelInitializer = new ChannelInitializer<EmbeddedChannel>() {
                @Override
                protected void initChannel(EmbeddedChannel ch) throws Exception {
                    ch.pipeline().addLast(new SimpleInHandlerA());
                    ch.pipeline().addLast(new SimpleInHandlerB());
                    ch.pipeline().addLast(new SimpleInHandlerC());
                }
            };
    
            EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(10);
            embeddedChannel.writeInbound(10);//向通道中寫一個報文
        }
    }
    

    運行結果:

    在這里插入圖片描述

    可以看到調用順序為A->B->C

    Outbound的事件處理順序

    3個OutboundHandler的業務代碼

    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:SimpleOutHandlerA
     */
    @Slf4j
    public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            log.info("出站處理器A,this is simple out handler A,write method:{}",msg);
            super.write(ctx, msg, promise);
        }
    }
    
    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:
     */
    @Slf4j
    public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            log.info("出站處理器B,this is simple out handler B,write method:{}",msg);
            super.write(ctx, msg, promise);
        }
    }
    
    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:
     */
    @Slf4j
    public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            log.info("出站處理器C,this is simple out handler C,write method:{}",msg);
            super.write(ctx, msg, promise);
        }
    }
    

    測試代碼

    /**
     * autor:liman
     * createtime:2020/9/19
     * comment:
     */
    @Slf4j
    public class TestPipelineOutBound {
    
        public static void main(String[] args) {
            ChannelInitializer<EmbeddedChannel> channelInitializer = new ChannelInitializer<EmbeddedChannel>() {
                @Override
                protected void initChannel(EmbeddedChannel ch) throws Exception {
                    ch.pipeline().addLast(new SimpleOutHandlerA());
                    ch.pipeline().addLast(new SimpleOutHandlerB());
                    ch.pipeline().addLast(new SimpleOutHandlerC());
                }
            };
    
            EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(10);
            embeddedChannel.writeOutbound(10);//向通道中寫一個報文
        }
    }
    

    運行結果

    在這里插入圖片描述

    可以看到調用順序是C->B->A。

    如果需要截斷流水線的處理,只需要在Handler中去掉調用super的read/write方法。具體實例可以直接注釋相關代碼然后測試即可。

    總結

    簡單總結了Channel,ChannelHandler,ChannelHandlerContext,Pipeline的關系。同時需要注意的是channelHandler出站和入站處理器的調用方向是不同的。

    版權聲明:本文為liman65727原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/liman65727/article/details/108679593

    智能推薦

    HTML拾遺

    目錄 一 鏈接標簽 二 行內元素和塊元素 三 視頻和音頻 四 頁面結構分析 五 iframe內聯框架 一 鏈接標簽 1 點睛 <a href="path" target="目標窗口位置"></a> path:鏈接路徑 target:鏈接在哪個窗口打開。常用值:_se...

    拾遺增補

    拾遺增補 1.1 線程的狀態 狀態 描述 NEW 尚未啟動的線程處于這種狀態 RUNNABLE 在Java虛擬機中正在執行的線程處于此狀態 BLOCKED 補阻塞等待對象鎖的線程處于此狀態 WAITING 正在等待另一個線程執行特定動作的線程處于h此x狀態 TIME_WAITING 正在等待另一個線程執行達到指定的等待時間的線程處于此狀態 TERMINATED 已經退出運行的線程處于此狀態 如圖所...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    猜你喜歡

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    styled-components —— React 中的 CSS 最佳實踐

    https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...

    精品国产乱码久久久久久蜜桃不卡