• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty學習筆記(三):Netty簡介、線程模型、Netty應用實例、Netty核心組件介紹

    標簽: Netty  后端  網絡

    第 4 章 Netty 詳解

    一、Netty簡介

    1、NIO 存在的問題

    • NIO 的類庫和 API 繁雜,使用麻煩:需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
    • 需要具備其他的額外技能:要熟悉 Java 多線程編程,因為 NIO 編程涉及到 Reactor 模式,必須對多線程 和網絡編程非常熟悉,才能編寫出高質量的 NIO 程序。
    • 開發工作量和難度都非常大:例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常流
      的處理等等。
    • JDKNIO 的 Bug:例如臭名昭著的 EpollBug,它會導致 Selector 空輪詢,最終導致 CPU100%。直到 JDK1.7
      版本該問題仍舊存在,沒有被根本解決。

    就因為上述原因,所以JBoss基于NIO開發了Netty。

    2、Netty介紹

    • Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基于事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序
    • Netty 可以幫助你快速、簡單的開發出一個網絡應用,相當于簡化和流程化了 NIO 的開發過程
    • Netty 是目前最流行的 NIO 框架,Netty 在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,知名的 Elasticsearch 、Dubbo 框架內部都采用了 Netty。

    官網:https://netty.io/

    Netty is an asynchronous event-driven network application framework
    for rapid development of maintainable high performance protocol servers & clients

    在這里插入圖片描述

    3、Netty的優點

    Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述NIO所帶的問題。

    • 設計優雅:適用于各種傳輸類型的統一 API 阻塞和非阻塞 Socket;基于靈活且可擴展的事件模型,可以清晰地分離關注點;高度可定制的線程模型 - 單線程,一個或多個線程池.
    • 使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其他依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。
    • 高性能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的內存復制。
    • 安全:完整的 SSL/TLS 和 StartTLS 支持。
    • 社區活躍、不斷更新:社區活躍,版本迭代周期短,發現的 Bug 可以被及時修復,同時,更多的新功能會被加入

    4、Netty版本說明

    • netty版本分為 netty3.x 和 netty4.x、netty5.x
    • 因為Netty5出現重大bug,已經被官網廢棄了,目前推薦使用的是Netty4.x的穩定版本
    • netty 下載地址: https://bintray.com/netty/downloads/netty/

    二、線程模型介紹

    1、線程模型基本介紹

    不同的線程模式,對程序的性能有很大影響,為了搞清 Netty 線程模式,我們來系統的講解下 各個線程模式

    目前存在的線程模型有:

    • 傳統阻塞 I/O 服務模型 。
    • Reactor 模式 。根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現
      • 單 Reactor 單線程;
      • 單 Reactor 多線程;
      • 主從 Reactor 多線程

    Netty 主要基于主從 Reactor 多線程模型做了一定的改進,其中主從 Reactor 多線程模型有多 個 Reactor

    2、傳統阻塞 I/O 服務模型

    傳統阻塞 I/O 服務模型工作原理如下圖所示:

    其中:黃色的框表示對象, 藍色的框表示線程,白色的框表示方法(API)

    在這里插入圖片描述

    傳統阻塞 I/O 服務模型特點

    • 采用阻塞 IO 模式獲取輸入的數據
    • 每個連接都需要獨立的線程完成數據的輸入,業務處理, 數據返回

    傳統阻塞 I/O 服務模型存在的問題

    • 當并發數很大,就會創建大量的線程,占用很大系統資源
    • 連接創建后,如果當前線程暫時沒有數據可讀,該線程會阻塞在 read 操作,造成線程資源浪費3、

    3、 Reactor 模式簡介

    Reactor 模式有多種翻譯:1. 反應器模式 2. 分發者模式(Dispatcher)3. 通知者模式(notifier)

    針對上面所述的傳統阻塞 I/O 服務模型的 2 個缺點,Reactor提出了下面的解決方案:

    • 基于 I/O 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象等待,無需阻塞等待所有連
      。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理
    • 基于線程池復用線程資源:不必再為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理
      在這里插入圖片描述
      I/O復用結合線程池,就是 Reactor 模式基本設計思想,如圖:

    在這里插入圖片描述

    說明:

    • 通過一個或多個輸入同時傳遞給服務處理器(ServiceHandler)的模式(基于事件驅動)
    • 服務器端程序處理傳入的多個請求,并將它們同步分派到相應的處理線程, 因此 Reactor 模式也叫 Dispatcher
      模式
    • Reactor 模式使用IO復用監聽事件, 收到事件后,分發給某個線程(進程), 這點就是網絡服務器高并發處理關鍵

    Reactor 模式中 核心組成:

    • Reactor:Reactor (就是服務處理器(ServiceHandler))在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理程序來對 IO 事件做出反應。 它就像公司的電話接線員,它接聽來自客戶的電話并將線路轉移到適當的聯系人;
    • Handlers:Handlers(就是事件處理器(EventHandler)),處理程序執行 I/O 事件要完成的實際事件,類似于客戶想要與之交談的公司中的實際官員。Reactor 通過調度適當的處理程序來響應 I/O 事件,處理程序執行非阻塞操作。

    根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現,下面將分別進行介紹:

    • 單 Reactor 單線程
    • 單 Reactor 多線程
    • 主從 Reactor 多線程

    Reactor 模式具有如下的優點:

    • 響應快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的
    • 可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷
    • 擴展性好,可以方便的通過增加 Reactor 實例個數來充分利用 CPU 資源
    • 復用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的復用性

    4、單 Reactor 單線程模式

    原理圖:

    在這里插入圖片描述

    原理說明:

    • Select 是前面 I/O 復用模型介紹的標準網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求
    • Reactor 對象通過 select 監控客戶端請求事件, 收到事件后,通過 dispatch 進行分發
    • 如果建立連接請求, 則右 Acceptor 通過 accept 處理連接請求, 然后創建一個 Handler 對象處理完成連接后的各種事件
    • 如果不是連接請求,則由 reactor 分發調用連接對應的 handler 來處理
    • Handler 會完成 Read→業務處理→Send 的完整業務流程

    前面的 NIO 群聊系統就是單 Reactor 單線程模式

    方案優缺點分析:

    • 優點:模型簡單,沒有多線程、進程通信、競爭的問題,全部都在一個線程中完成
    • 缺點:性能問題,只有一個線程,無法完全發揮多核 CPU 的性能。Handler 在處理某個連接上的業務時,整
      個進程無法處理其他連接事件,很容易導致性能瓶頸
    • 缺點:可靠性問題,線程意外終止,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部
      消息,造成節點故障

    使用場景:客戶端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間復雜度 O(1) 的情況

    5、單 Reactor 多線程模式

    原理圖:
    在這里插入圖片描述
    原理說明:

    • Reactor 對象通過 select 監控客戶端請求 事件, 收到事件后,通過 dispatch 進行分發
    • 如果建立連接請求, 則右 Acceptor 通過 accept 處理連接請求, 然后創建一個 Handler 對象處理完成連接后的各種事件
    • 如果不是連接請求,則由 reactor 分發調用連接對應的 handler 來處理
    • handler 只負責響應事件,不做具體的業務處理, 通過 read 讀取數據后,會分發給后面的 worker 線程池的某個線程處理業務
    • worker 線程池會分配獨立線程完成真正的業務,并將結果返回給 handler
    • handler 收到響應后,通過 send 將結果返回給 client

    方案優缺點分析:

    • 優點:可以充分的利用多核 cpu 的處理能力
    • 缺點:多線程數據共享和訪問比較復雜
    • 缺點: reactor 處理所有的事件的監聽和響應,在單線程運行, 在高并發場 景容易出現性能瓶頸.

    6、主從 Reactor 多線程

    針對單 Reactor 多線程模型中,Reactor 在單線程中運行,高并發場景下容易成為性能瓶頸,可以讓 Reactor 在多線程中運行

    工作原理圖 :

    在這里插入圖片描述

    原理說明:

    • Reactor 主線程 MainReactor 對象通過 select 監聽連接事件, 收到事件后,通過 Acceptor 處理連接事件

    • 當 Acceptor 處理連接事件后,MainReactor 將連接分配給 SubReactor(有多個)

    • SubReactor 將連接加入到連接隊列進行監聽,并創建 handler 進行各種事件處理

    • 當有新事件發生時, subreactor 就會調用對應的 handler 處理

    • handler 先read 讀取數據,然后分發給后面的 worker 線程池處理

    • worker 線程池分配獨立的 worker 線程進行業務處理,并返回結果

    • handler 收到響應的結果后,再通過 send 將結果返回給 client

    • Reactor 主線程可以對應多個 Reactor 子線程, 即 MainRecator 可以關聯多個 SubReactor

    原理圖的另一種表示,結構和上面是類似的:
    在這里插入圖片描述
    方案優缺點說明:

    • 優點:父線程與子線程的數據交互簡單職責明確,父線程只需要接收新連接,子線程完成后續的業務處理。
    • 優點:父線程與子線程的數據交互簡單,Reactor 主線程只需要把新連接傳給子線程,子線程無需返回數據。
    • 缺點:編程復雜度較高

    這種模型在許多項目中廣泛使用,包括 Nginx 主從 Reactor 多進程模型,Memcached 主從多線程, Netty 主從多線程模型的支持

    三、Netty 模型

    1、模型原理

    Netty 主要基于主從 Reactors 多線程模型(如圖)做了一定的改進,其中主從 Reactor 多線程模型有多個 Reactor

    從整體上看,Netty 模型如圖所示:
    在這里插入圖片描述
    說明:

    • BossGroup 線程維護 Selector, 只關注 Accecpt
    • 當接收到 Accept 事件,獲取到對應的 SocketChannel, 封裝成 NIOScoketChannel 并注冊到 Worker 線程(事件循環), 并進行維護
    • 當 Worker 線程監聽到 selector 中通道發生自己感興趣的事件后,就進行處理(就由 handler), 注意 handler 已經加入到通道

    更加詳細的的模型如下圖所示:
    在這里插入圖片描述
    Netty模型原理說明:

    • Netty 抽象出兩組線程池: BossGroup 專門負責接收客戶端的連接、WorkerGroup 專門負責網絡的讀寫

    • BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup 。NioEventLoopGroup 相當于一個事件循環組, 這個組中含有多個事件循環 ,每一個事件循環是 NioEventLoop

    • NioEventLoop 表示一個不斷循環的執行處理任務的線程, 每個 NioEventLoop 都有一個 selector, 用于監聽綁 定在其上的 socket 的網絡通訊

    • 每個 BossNioEventLoop 循環執行的步驟有 3 步

      • 輪詢 accept 事件
      • 處理 accept 事件 , 與 client 建立連接 , 生成 NioScocketChannel , 并將其注冊到某個 worker NIOEventLoop 上 的 selector
      • 處理任務隊列的其他任務 , 即 runAllTasks
    • 每個 WorkerNIOEventLoop 循環執行的步驟

      • 輪詢 read,write 事件
      • 在對應 NioScocketChannel上進行 處理 i/o 事件, 即 read,write 事件
      • 處理任務隊列的其他任務 , 即 runAllTasks
    • 每個WorkerNIOEventLoop 處理業務時,會使用pipeline(管道),pipeline 中包含了 channel, 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的 處理器

    2、應用實例1-服務器客戶端通信

    Netty 服務器在 6666 端口監聽,客戶端能發送消息給服務器 “hello, 服務器”。服務器可以回復消息給客戶端 “hello, 客戶端”

    注意運行前需要使用Maven導入Netty的包或者依賴

    服務器端代碼:

    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
    
            //創建 bossGroup 和 workerGroup
            //bossGroup 只處理連接請求,workerGroup 處理客戶端業務
            //兩個都是無限循環
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                //創建一個服務器端啟動對象,并設置參數
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                //使用鏈式編程來設置啟動器
                serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                        .channel(NioServerSocketChannel.class)//設置使用NioServerSocketChannel作為服務器的通道實現
                        .option(ChannelOption.SO_BACKLOG, 128) //設置線程隊列可以得到的連接個數
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態
                        .childHandler(new ChannelInitializer<SocketChannel>() { //創建一個管道測試對象
                            //給pipeline設置處理器
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //在管道中加入自定義的處理器(NettyServerHandler)
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        }); //給workerGroup 的 EventLoop 對應的管道設置處理器
    
                System.out.println("服務器 is ready");
    
                //綁定端口并同步,生成一個 ChannelFuture對象
                //啟動服務器
                ChannelFuture cf = serverBootstrap.bind(6666).sync();
    
                //對關閉通道進行監聽
                cf.channel().closeFuture().sync();
            } finally {
                //關閉線程組
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

    NettyServerHandler:

    /**
     * 該類用于實際處理數據,需要繼承Netty規定好的HandlerAdapter
     *
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 用于讀取(接收)數據
         * @param ctx 上下文對象,其中包含了管道pipeline,通道channel,地址等信息
         * @param msg 客戶端發送的信息
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server ctx = " + ctx);
            //將msg轉為一個ByteBuf對象
            //ByteBuf是 Netty提供的,與NIO的ByteBuffer不同
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客戶端發送的信息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客戶端地址為:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 定義數據讀取完畢后的操作
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
            //writeAndFlush作用的是將數據寫到緩沖區并發送
            //需要對發送的數據進行編碼
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8));
        }
    
        /**
         * 處理異常,一般是關閉通道
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    客戶端代碼:

    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
    
            //創建事件組
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    
            try {
                //創建客戶端啟動對象
                //注意客戶端是Bootstrap,不是ServerBootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                //設置相關的參數
                bootstrap.group(eventLoopGroup) //設置線程組
                        .channel(NioSocketChannel.class) //設置客戶端通道的實現類(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler());
                            }
                        });
    
                System.out.println("客戶端 is ok");
    
                //啟動客戶端連接服務器端,異步
                ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1", 6666)).sync();
    
                //對通道關閉進行監聽
                sync.channel().closeFuture().sync();
    
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    NettyClientHandler:

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        //當通道就緒就會觸發
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client ctx : " + ctx);
            //向服務器端發送消息
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器", CharsetUtil.UTF_8));
        }
    
        //接收服務端發送的消息
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服務器回復:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服務器地址:" + ctx.channel().remoteAddress());
        }
    
        //處理異常
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    運行結果:

    服務端輸出:

    服務器 is ready
    server ctx = ChannelHandlerContext(NettyServerHandler#0, [id: 0x67909ee8, L:/127.0.0.1:6666 - R:/127.0.0.1:3991])
    客戶端發送的信息是:hello, 服務器
    客戶端地址為:/127.0.0.1:3991

    客戶端輸出:

    客戶端 is ok
    client ctx : ChannelHandlerContext(NettyClientHandler#0, [id: 0xabaaab32, L:/127.0.0.1:3991 - R:/127.0.0.1:6666])
    服務器回復:hello,客戶端
    服務器地址:/127.0.0.1:6666

    注意

    對于服務器端的 bossGroup 和 workerGroup 分別對應模型中的兩個NioEventLoopGroup。其中每個NioEventLoopGroup下面NioEventLoop的數目(線程數目)默認為CPU的核數 * 2。如果需要設定,可以直接在構造函數傳入,如代碼第一行

    //EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    CPU的核數可以通過下面的代碼查看,我的是4核,而創建的NioEventLoop就是8個

    System.out.println(NettyRuntime.availableProcessors());

    可以在debug中看到從看到,如下圖。同時也可以看出多個NioEventLoop是使用EventExecutor管理的。
    在這里插入圖片描述

    任務隊列中的 Task 有 3 種典型使用場景

    • 用戶程序自定義的普通任務
    • 用戶自定義定時任務
    • 非當前 Reactor 線程調用 Channel 的各種方法

    示例:

    /**
     * 該類用于實際處理數據,需要繼承Netty規定好的HandlerAdapter
     *
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 用于讀取(接收)數據
         * @param ctx 上下文對象,其中包含了管道pipeline,通道channel,地址等信息
         * @param msg 客戶端發送的信息
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            //比如這里我們有一個非常耗時長的業務-> 異步執行
            // -> 提交該 channel 對應的 NIOEventLoop 的 taskQueue 中,
    
            ctx.channel().eventLoop().execute(()->{
                try {
                    Thread.sleep(5000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-1", CharsetUtil.UTF_8));
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            //這個任務和上面的任務放到同一個eventLoop的taskQueue
            //所以這兩個任務是串行執行的
            ctx.channel().eventLoop().execute(()->{
                try {
                    Thread.sleep(10000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-2", CharsetUtil.UTF_8));
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            //解決方案 2: 用戶自定義定時任務 -》 該任務是提交到 scheduledTaskQueue 中
            ctx.channel().eventLoop().schedule(()->{
                try {
                    Thread.sleep(10000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-2", CharsetUtil.UTF_8));
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 5, TimeUnit.SECONDS);
        }
    }

    3、應用實例2-實現HTTP 服務

    Netty 服務器在 8888 端口監聽,瀏覽器發出請求 http://localhost:8888/

    服務器可以回復消息給客戶端 “Hello,客戶端”, 并對特定請求資源進行過濾.

    實例代碼:

    NettyHttpServer(Http服務器)

    public class NettyHttpServer {
    
        public static void main(String[] args) throws InterruptedException {
    
            //創建 bossGroup 和 workerGroup
            //bossGroup 只處理連接請求,workerGroup 處理客戶端業務
            //兩個都是無限循環
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //創建一個服務器端啟動對象,并設置參數
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new NettyHttpServerInitializer());
    
                ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
    
                System.out.println("服務端 is ready");
    
                channelFuture.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
    }

    NettyHttpServerInitializer(Channel初始化器),可以向ChannelPipeline加入handler

    public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {
    
        //向管道加入處理器
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
    
            //得到管道
            ChannelPipeline pipeline = ch.pipeline();
    
            //加入netty提供的httpServerCodec =》(code + decode)
            //HttpServerCodec 是netty提供的http編碼-解碼器
            pipeline.addLast(new HttpServerCodec());
    
            //添加自定義handler
            pipeline.addLast(new NettyHttpServerHandler());
        }
    }

    NettyHttpServerHandler(用于處理Http請求)

    /**
     * 說明:
     *      SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的子類
     *      HttpObject :客戶端和服務器端通信的數據被封裝成 HttpObject
     */
    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        //讀取客戶端發來的數據
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    
            //判斷是否是HttpRequest
            if (msg instanceof HttpRequest) {
    
                //對圖標資源獲取進行過濾
                HttpRequest request = (HttpRequest) msg;
                URI uri = new URI(request.uri());
    
                if ("/favicon.ico".equals(uri.getPath())) {
                    System.out.println("請求了 favicon.ico ,不做響應");
                    return;
                }
    
                System.out.println("pipeline = " + ctx.pipeline().hashCode() + " handlder = " + this.hashCode());
                System.out.println("msg 類型:" + msg.getClass());
                System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
    
    
                //回復信息給瀏覽器(Http協議)
                ByteBuf content = Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8);
    
                //構造http響應,即httpResponse
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                        HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
    
    
                //返回httpResponse
                ctx.writeAndFlush(response);
            }
    
        }
    }

    運行服務器之后,在瀏覽器內輸入地址,可以看到下面的結果:

    在這里插入圖片描述

    服務器端控制臺輸出:

    服務端 is ready
    pipeline = 1328644476 handlder = 5644135
    msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
    客戶端地址:/0:0:0:0:0:0:0:1:8247
    請求了 favicon.ico ,不做響應

    其中最后第5行的輸出是瀏覽器獲取圖標的請求,被攔截。對應NettyHttpServerHandler類中第12~22行代碼所對應的功能
    在這里插入圖片描述
    另外http是連接不是長連接,每次發送請求都會重新建立連接,而且在Netty中每次處理都會生成pipeline和handlder,下面再次刷新可以看出:

    服務端 is ready
    pipeline = 1328644476 handlder = 5644135
    msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
    客戶端地址:/0:0:0:0:0:0:0:1:8247
    請求了 favicon.ico ,不做響應
    pipeline = 99933460 handlder = 1089885134
    msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
    客戶端地址:/0:0:0:0:0:0:0:1:8255
    請求了 favicon.ico ,不做響應

    4、模型方案再說明

    Netty 抽象出兩組線程池,BossGroup 專門負責接收客戶端連接,WorkerGroup 專門負責網絡讀寫操作。

    NioEventLoop 表示一個不斷循環執行處理任務的線程,每個 NioEventLoop 都有一個 selector,用于監聽綁定 在其上的 socket 網絡通道。

    NioEventLoop 內部采用串行化設計(taskQueue),從消息的讀取->解碼->處理->編碼->發送,始終由 IO 線程 NioEventLoop負責

    其中:

    • NioEventLoopGroup 下包含多個 NioEventLoop
    • 每個 NioEventLoop 中包含有一個 Selector,一個 taskQueue
    • 每個 NioEventLoop 的 Selector 上可以注冊監聽多個 NioChannel
    • 每個 NioChannel 只會綁定在唯一的 NioEventLoop 上
    • 每個 NioChannel 都綁定有一個自己的 ChannelPipeline

    四、異步模型

    1、基本介紹

    • 異步的概念和同步相對。當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的組件在
      完成后,通過狀態、通知和回調來通知調用者。

    • Netty 中的 I/O 操作是異步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture。

    • 調用者并不能立刻獲得結果,而是通過 Future-Listener 機制,用戶可以方便的主動獲取或者通過通知機制獲得 IO 操作結果

    • Netty 的異步模型是建立在 future 和 callback 的之上的。callback 就是回調。重點說 Future,它的核心思想 是:假設一個方法 fun,計算過程可能非常耗時,等待 fun 返回顯然不合適。那么可以在調用 fun 的時候,立 馬返回一個 Future,后續可以通過 Future 去監控方法 fun 的處理過程(即 : Future-Listener 機制)

    2、Future -Listener 機制

    • Future 表示異步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等等.
    • ChannelFuture 是一個接口 : public interface ChannelFuture extends Future<Void>我們可以添加監聽器,當監聽的事件發生時,就會通知到監聽器
    • 當 Future 對象剛剛創建時,處于非完成狀態,調用者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,注冊監聽函數來執行完成后的操作。

    常見有如下操作:

    • 通過 isDone 方法來判斷當前操作是否完成;
    • 通過 isSuccess 方法來判斷已完成的當前操作是否成功;
    • 通過 getCause 方法來獲取已完成的當前操作失敗的原因
    • 通過 isCancelled 方法來判斷已完成的當前操作是否被取消;
    • 通過 addListener 方法來注冊監聽器,如當操作已完成(isDone 方法返回完成),將會通知指定的監聽器;如果 Future 對象已完成,則通知指定的監聽器

    3、應用示例

    舉例說明:綁定端口是異步操作,當綁定操作處理完,將會調用相應的監聽器處理邏輯

    //綁定端口并同步,生成一個 ChannelFuture對象
    //啟動服務器
    ChannelFuture cf = serverBootstrap.bind(6666).sync();
    
    //為ChannelFuture注冊監聽器,監聽關心的事件
    cf.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()){
                System.out.println("服務器綁定 6666 端口成功");
            } else {
                System.out.println("服務器綁定 6666 端口失敗");
            }
        }
    });

    五、Netty 核心模塊組件

    1、 Bootstrap、ServerBootstrap

    Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程序,串聯 各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導類,ServerBootstrap 是服務端啟動引導類

    常見的方法有 :

    //該方法用于服務器端,用來設置兩個 EventLoop
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup);
    
    //該方法用于客戶端,用來設置一個 EventLoop
    public B group(EventLoopGroup group);
    
    //該方法用來設置一個服務器端的通道實現
    public B channel(Class<? extends C> channelClass);
    
    //用來給 ServerChannel 添加配置
    public <T> B option(ChannelOption<T> option, T value);
    
    //用來給接收到的通道添加配置
    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value);
    
    //該方法用來設置業務處理類(自定義的 handler)
    public ServerBootstrap childHandler(ChannelHandler childHandler);
    
    //該方法用于服務器端,用來設置占用的端口號
    public ChannelFuture bind(int inetPort);
    
    //該方法用于客戶端,用來連接服務器端
    public ChannelFuture connect(String inetHost, int inetPort)

    2、Future、ChannelFuture

    Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過一會等它執行完成或 者直接注冊一個監聽

    具體的實現就是通過 Future 和 ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件

    常見的方法有 :

    Channel channel();//返回當前正在進行 IO 操作的通道
    
    ChannelFuture sync();//等待異步操作執行完畢

    3、Channel

    • Channel 是Netty 網絡通信的組件,能夠用于執行網絡 I/O 操作。

    • 通過 Channel 可獲得當前網絡連接的通道的狀態

    • 通過 Channel 可獲得 網絡連接的配置參數 (例如接收緩沖區大小)

    • Channel 提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定端口),異步調用意味著任何 I/O 調用都將立即返 回,并且不保證在調用結束時所請求的 I/O 操作已完成

    • 調用立即返回一個 ChannelFuture 實例,通過注冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回調通知調用方

    • 支持關聯 I/O 操作與對應的處理程序

    • 不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應,常用的 Channel 類型:

      • NioSocketChannel,異步的客戶端 TCPSocket 連接。
      • NioServerSocketChannel,異步的服務器端 TCPSocket 連接。
      • NioDatagramChannel,異步的 UDP 連接。
      • NioSctpChannel,異步的客戶端 Sctp 連接。
      • NioSctpServerChannel,異步的 Sctp 服務器端連接,這些通道涵蓋了 UDP 和 TCP 網絡 IO 以及文件 IO

    4、Selector

    Netty 基于 Selector 對象實現 I/O 多路復用,通過 Selector 一個線程可以監聽多個連接的 Channel 事件。

    當向一個 Selector 中注冊 Channel 后,Selector 內部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網絡連接完成等),這樣程序就可以很簡單地使用一個 線程高效地管理多個 Channel

    5、ChannelHandler 及其實現類

    ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操作,并將其轉發到其 ChannelPipeline(業務處理鏈)
    中的下一個處理程序。

    ChannelHandler 本身并沒有提供很多方法,因為這個接口有許多的方法需要實現,方便使用期間,可以繼承它
    的子類

    ChannelHandler 及其實現類一覽圖
    在這里插入圖片描述

    • ChannelInboundHandler 用于處理入站 I/O 事件。
    • ChannelOutboundHandler 用于處理出站 I/O 操作。
    • ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。
    • ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。
    • ChannelDuplexHandler 用于處理入站和出站事件。

    經常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter,然后通過重寫相應方法實現業務
    邏輯,我們接下來看看一般都需要重寫哪些方法

    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    
        public ChannelInboundHandlerAdapter() {
        }
    
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelRegistered();
        }
    
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
        }
    
        //通道就緒事件
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }
    
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        //通道讀取數據事件
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    	//通道讀取數據完畢事件
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
        }
    
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelWritabilityChanged();
        }
    	//通道發生異常事件
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.fireExceptionCaught(cause);
        }
    }

    6、Pipeline 和 ChannelPipeline

    ChannelPipeline 是一個 Handler 的集合,它負責處理和攔截 inbound 或者 outbound 的事件和操作,相當于 一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作)

    ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互

    在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下
    在這里插入圖片描述

    • 一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext (就是ctx)組成的雙向鏈表,并且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler
    • 入站事件和出站事件在一個雙向鏈表中,入站事件會從鏈表 head 往后傳遞到最后一個入站的 handler,出站事件會從鏈表 tail 往前傳遞到最前一個出站的 handler,兩種類型的 handler 互不干擾

    常用方法:

    • ChannelPipeline addFirst(ChannelHandler... handlers):把一個業務處理類(handler)添加到鏈中的第一個位置
    • ChannelPipeline addLast(ChannelHandler... handlers):把一個業務處理類(handler)添加到鏈中的最后一個位置

    7、ChannelHandlerContext

    保存 Channel 相關的所有上下文信息,同時關聯一個 ChannelHandler 對象

    ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler , 同 時ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的信息,方便對 ChannelHandler進行調用.

    常用方法:

    • ChannelFuture close():關閉通道
    • ChannelOutboundInvoker flush():刷新
    • ChannelFuture writeAndFlush(Object msg): 將 數據寫到 ChannelPipeline 中當前ChannelHandler 的下一個 ChannelHandler 開始處理(出站)

    8、ChannelOption

    Netty 在創建 Channel 實例后,一般都需要設置 ChannelOption 參數。

    ChannelOption 參數如下:

    • ChannelOption.SO_BACKLOG:對應 TCP/IP 協議 listen 函數中的 backlog 參數,用來初始化服務器可連接隊列大小。服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接。多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog 參數指定了隊列的大小。
    • ChannelOption.SO_KEEPALIVE:設置一直保持連接活動狀態

    9、EventLoopGroup 和其實現類 NioEventLoopGroup

    EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護著一個 Selector 實例。

    EventLoopGroup 提供 next 接口,可以從組里面按照一定規則獲取其中一個 EventLoop來處理任務。

    在 Netty 服務器端編程中,我們一般都需要提供兩個 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。

    通常一個服務端口即一個 ServerSocketChannel對應一個Selector 和一個EventLoop線程。BossEventLoop 負責接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理,如下圖所示

    在這里插入圖片描述

    • BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護著一個注冊了ServerSocketChannel 的 Selector 實例。BossEventLoop 不斷輪詢, Selector 將連接事件分離出來
    • 連接完成后,將接收到的 SocketChannel 交給 WorkerEventLoopGroup
    • WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop來將這個 SocketChannel 注冊到其維護的 Selector 并對其后續的 IO 事件進行處理

    常用方法:

    • public NioEventLoopGroup():構造方法
    • public Future<?> shutdownGracefully():斷開連接,關閉線程

    10、Unpooled 類

    Unpooled 類是Netty 提供一個專門用來操作緩沖區(即Netty的數據容器,如ByteBuf)的工具類

    常用方法:

    ////通過給定的數據和字符編碼返回一個 ByteBuf 對象(類似于 NIO 中的 ByteBuffer 但有區別)
    public static ByteBuf copiedBuffer(CharSequence string, Charset charset);
    //使用
    ByteBuf content = Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8);

    使用示例:

    public class NettyByteBuf01 {
        public static void main(String[] args) {
    
    
            //創建一個ByteBuf
            //說明
            //1. 創建 對象,該對象包含一個數組arr , 是一個byte[10]
            //2. 在netty 的buffer中,不需要使用flip 進行反轉
            //   底層維護了 readerindex 和 writerIndex
            //3. 通過 readerindex 和  writerIndex 和  capacity, 將buffer分成三個區域
            // 0---readerindex 已經讀取的區域
            // readerindex---writerIndex , 可讀的區域
            // writerIndex -- capacity, 可寫的區域
            ByteBuf buffer = Unpooled.buffer(10);
    
            for(int i = 0; i < 10; i++) {
                buffer.writeByte(i);
            }
    
            System.out.println("capacity=" + buffer.capacity());//10
            //輸出
    //        for(int i = 0; i<buffer.capacity(); i++) {
    //            System.out.println(buffer.getByte(i));
    //        }
            for(int i = 0; i < buffer.capacity(); i++) {
                System.out.println(buffer.readByte());
            }
            System.out.println("執行完畢");
        }
    }
    public class NettyByteBuf02 {
        public static void main(String[] args) {
    
            //創建ByteBuf
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
    
            //使用相關的方法
            if(byteBuf.hasArray()) { // true
    
                byte[] content = byteBuf.array();
    
                //將 content 轉成字符串
                System.out.println(new String(content, Charset.forName("utf-8")));
    
                System.out.println("byteBuf=" + byteBuf);
    
                System.out.println(byteBuf.arrayOffset()); // 0
                System.out.println(byteBuf.readerIndex()); // 0
                System.out.println(byteBuf.writerIndex()); // 12
                System.out.println(byteBuf.capacity()); // 36
    
                //System.out.println(byteBuf.readByte()); //
                System.out.println(byteBuf.getByte(0)); // 104
    
                int len = byteBuf.readableBytes(); //可讀的字節數  12
                System.out.println("len=" + len);
    
                //使用for取出各個字節
                for(int i = 0; i < len; i++) {
                    System.out.println((char) byteBuf.getByte(i));
                }
    
                //按照某個范圍讀取
                System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
                System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
    
    
            }
        }
    }
    版權聲明:本文為han_zhuang原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/han_zhuang/article/details/106695018

    智能推薦

    Netty學習筆記(三)--- 線程模型和EventLoopGroup

    Netty之所以成為高性能NIO框架,其精心設計的高效線程模型功不可沒,Netty線程模型基于了一個著名的模式——Reactor模式。 Reactor模型 Reactor模型是一種經典的線程模型,也叫反應器模型,網上已經由很多對它的介紹,這里就不過多的介紹,只是簡單介紹三種模型的特點。 單線程模型 Reactor單線程模型僅使用一個線程來處理所有的事情,包括服務端和客戶端的...

    Netty學習——Reactor線程模型

    一 netty概述 1.1 為什么有了NIO還有Netty NIO的缺點 JDK 的 NIO 底層由 epoll 實現,該實現飽受詬病的Selector 空輪詢 bug 會導致 cpu 飆升 100% NIO的API繁雜,使用麻煩,必須熟練掌握Selector、Channel、Buffer等相關API。并且需要熟練Java多線程編程和網絡編程,才能寫出高質量的NIO代碼。 開發工作量和難度都非常大...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    猜你喜歡

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    styled-components —— React 中的 CSS 最佳實踐

    https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...

    精品国产乱码久久久久久蜜桃不卡