• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty拾遺(三)——Reactor設計模式

    標簽: # Netty  netty

    前言

    上一篇博客總結了NIO的三個關鍵組件——NIO的三個組件,但是其中并沒有提到SelectionKey,在reactor設計模式中,SelectionKey起到了關鍵性的作用,這篇博客會說明一下SelectionKey,同時在第一遍Netty系列博客中我們簡單介紹過三種IO模式,并通過簡單實例代碼進行了說明:BIO,NIO與AIO簡介。這篇博客在前兩篇博客的基礎上繼續總結reactor設計模式。依舊會參考《Netty、Redis、Zookeeper高并發實戰》和《Netty權威指南》兩本書。

    說一下SelectionKey

    簡單來說可以理解為SelectionKey為被Selector(選擇器)選中的注冊事件。在上一篇博客中,一個IO事件發生后,如果之前在選擇器中注冊過,就會被選擇器選中。而這里的SelectionKey就可以簡單理解為被選中了的IO事件。在實際代碼中,可以通過SelectionKey獲得通道的IO事件類型,還可以獲得發生IO事件所在的通道,此外還可以獲得選擇鍵對應的選擇器實例。

    主要存在四個事件類型,如下所示

    事件名稱枚舉值
    可讀SelectionKey.OP_READ
    可寫SelectionKey.OP_WRITE
    連接SelectionKey.CONNECT
    接受SelectionKey.OP_ACCEPT

    SelectionKey其中有兩個重要的方法:attach與attachment方法

    void attach(Object o) 這個方法可以將任何的對象作為附件綁定的到SelectionKey實例上,前面說過SelectionKey表示的是具體的IO事件,這里的attach就是給對應的IO事件綁定處理器。

    void attachment() 這個方法是取出SelectionKey對應的事件處理器。

    reactor模式簡介

    reactor模式其實說到底就是一個設計模式,該設計模式其實是在文章 《Scalable IO in Java》中的描述,網上大部分文章關于reactor模式的示例圖,均是來自于這篇文章,這篇博客依舊會采用該示意圖(因為我實在想不出更好的圖來描述reactor模式)。總體來說Reactor模式由Reactor處理線程與Handler處理器兩大角色來組成。

    Reactor處理線程負責響應各種IO事件,并將事件分發到Handlers處理器處理。

    Handlers處理器的職責,就是非阻塞的執行業務處理邏輯。

    單線程的reactor模式

    《Netty、Redis、Zookeeper高并發實戰》一書中的Reactor模式的實例有些混亂,這里簡單梳理一下。

    這里補充一點,在前一篇博客中提到NIO的兩個channel——ServerSocketChannel和SocketChannel,兩者關注的事件類型也不同。

    ServerSocketChannel只需要關注OP_ACCEPT 連接建立事件,并且ServerSocketChannel通過accept獲取SocketChannel與客戶端連接的通道。各個通道關注的事件列表具體如下。

    通道類型OP_READOP_WRITEOP_CONNECTOP_ACCEPT
    服務端ServerSocketChannelY
    服務端SocketChannelYY
    客戶端SocketChannelYYY

    由于服務端會存在ServerSocketChannel和SocketChannel ,因此需要對兩個通道的不同事件使用不同的事件處理器。

    單線程的reactor模式,其實就是只有一個reactor線程,只有一個線程用于響應IO事件并分發事件。這里直接進入代碼實例

    1、簡單定義一個Handler處理的接口

    《Netty、Redis、Zookeeper高并發實戰》一書中直接實現的是runnable接口,導致業務和線程的處理混合在一起

    /**
     * autor:liman
     * createtime:2020/9/9
     * comment:Handler處理器接口
     */
    public interface IHandler {
    
        public void handlerSelectKey();
    
    }
    

    2、連接事件的處理Handler

    /**
     * autor:liman
     * createtime:2020/9/9
     * comment: Acceptor的處理類 處理連接的建立事件
     */
    @Slf4j
    public class AcceptorHandler implements IHandler{
    
        private Selector selector;
        private ServerSocketChannel serverSocketChannel;
    
        public AcceptorHandler(Selector selector, ServerSocketChannel serverSocketChannel) {
            this.selector = selector;
            this.serverSocketChannel = serverSocketChannel;
        }
    
        //這里完成事件的分發
        @Override
        public void handlerSelectKey() {
            try{
                SocketChannel socketChannel = serverSocketChannel.accept();
                if(socketChannel!=null){
                    new KeyEventHandler(selector,socketChannel);
                }
            }catch (Exception e){
                log.error("事件分發異常,異常信息為:{}",e);
            }
        }
    }
    

    3、讀寫事件的處理Handler

    /**
     * autor:liman
     * createtime:2020/9/10
     * comment: 可讀,可寫的事件處理Handler
     */
    @Slf4j
    public class KeyEventHandler implements IHandler {
    
        private Selector selector;
        private SocketChannel socketChannel;
        private SelectionKey selectionKey;
    
        /**
         * 這里需要綁定非連接事件的處理器
         * @param selector
         * @param socketChannel
         */
        public KeyEventHandler(Selector selector, SocketChannel socketChannel) {
            try {
                this.selector = selector;
                this.socketChannel = socketChannel;
                this.socketChannel.configureBlocking(false);
                selectionKey = this.socketChannel.register(this.selector, 0);//這里沒有注冊任何事件到selector上,只是獲取指定的選擇鍵
                selectionKey.attach(this);
                //通過selectionKey注冊可讀事件
                selectionKey.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void handlerSelectKey() {
            try {
                String currentTime = "";
                if(selectionKey.isValid()) {
                    if (selectionKey.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int readBytes = socketChannel.read(readBuffer);
                        if (readBytes > 0) {
                            readBuffer.flip();
                            byte[] bytes = new byte[readBuffer.remaining()];
                            readBuffer.get(bytes);
                            String body = new String(bytes, "UTF-8");
                            log.info("the time server receive order:{}", body);
                        }else if(readBytes<0){//如果讀取不到數據,則表示SocketChannel關閉,直接返回
                            socketChannel.close();
                            return;
                        }
                        //讀取事件處理完成之后,需要再一次注冊可寫事件
                        selectionKey.interestOps(SelectionKey.OP_WRITE);
                    } else if (selectionKey.isWritable()) {
                        currentTime = LocalDateTime.now().toString();
                        log.info("send message 2 client ,current time is : {}",currentTime);
                        doWrite(socketChannel, currentTime);
                        //可讀事件處理完成之后,需要再一次注冊可讀事件
                        selectionKey.interestOps(SelectionKey.OP_READ);
                    }
                    selectionKey.attach(this);
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        /**
         * 通過緩沖區,往channel中寫入數據
         * @param socketChannel
         * @param response
         * @throws IOException
         */
        private void doWrite(SocketChannel socketChannel, String response) throws IOException {
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
            }
        }
    }
    

    4、分發事件的線程(這個我理解為就是Reactor線程)

    /**
     * autor:liman
     * createtime:2020/9/10
     * comment:分發事件處理任務的線程
     */
    @Slf4j
    public class DispatchThread implements Runnable {
    
        private Selector selector;
    
        public DispatchThread(Selector selector) {
            this.selector = selector;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select(1000);
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        //Reactor負責dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    selected.clear();
                }
            } catch (Exception ex) {
                log.error("事件分配失敗,異常信息為:{}", ex);
            }
        }
    
        //不同的IO事件處理類分發
        private void dispatch(SelectionKey selectionKey){
            IHandler handler = (IHandler) selectionKey.attachment();
            //調用之前attach綁定到選擇鍵的handler處理器對象
            if (handler != null) {
                handler.handlerSelectKey();
            }
        }
    }
    

    5、主要啟動類

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:單線程reactor 模式的客戶端
     */
    @Slf4j
    public class TimeReactorServer {
    
        private static final String host = "127.0.0.1";
        private static final int port = 9898;
    
        public static void main(String[] args) {
            //啟動的時候,將服務端的ServerSocketChannel注冊到Selector上,并綁定連接事件的處理方式
            try {
                Selector selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                InetSocketAddress inetAddress =
                        new InetSocketAddress(host,
                                port);
                serverSocketChannel.bind(inetAddress);
                SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                //綁定連接事件的處理Handler
                selectionKey.attach(new AcceptorHandler(selector,serverSocketChannel));
    
                //啟動Reactor線程
                new Thread(new DispatchThread(selector)).start();
    
            } catch (Exception e) {
                log.error("連接建立出現異常,異常信息為:{}",e);
            }
        }
    }
    

    客戶端的實現不需要改變,上一篇博客的TimeClient就可以用于測試單線程的reactor代碼。

    這個時候就可以拿出那個經典的圖片來說道說道了

    在這里插入圖片描述

    我們可以看到,DispatchThread其實就是reactor線程,負責各種IO事件處理的分發,AcceptorHandler我個人理解就是acceptor,只是負責連接事件建立的處理。

    所謂單線程的Reactor模式與多線程Reactor模式的區別最直觀的表現就在于DispatchThread的個數和工作的selector選擇器的個數都不止一個。

    整體而言我們的實現相比圖中有些出入,因為我們的分發操作,并不是在acceptor中處理,但是這樣可以較好的理解reactor的幾個要素。

    多線程的reactor模式

    前面提到過,相比單線程的reactor模式而言,最大的區別在于不止一個選擇器,不止一個線程進行selector注冊事件的掃描與分配。同時處理讀寫請求的線程相比單線程模式也不止一個

    1、統一一個處理類接口,為了區別與單線程的,這里重新給定一個名稱

    /**
     * autor:liman
     * createtime:2020/9/9
     * comment:統一一個處理器接口
     */
    public interface IMultiHandler{
        public void handlerSelectKey();
    }
    

    2、普通IO事件的處理Handler

    /**
     * autor:liman
     * createtime:2020/9/10
     * comment:普通IO讀寫事件的處理器,這里繼承了Thread類,實現了IMultiHandler接口
     */
    @Slf4j
    public class MultiKeyEventHandler extends Thread implements IMultiHandler {
    
        private Selector selector;
        private SocketChannel socketChannel;
        private SelectionKey selectionKey;
    
        /**
         * 這里需要綁定非連接事件的處理器
         *
         * @param selector
         * @param socketChannel
         */
        public MultiKeyEventHandler(Selector selector, SocketChannel socketChannel) {
            try {
                this.selector = selector;
                this.socketChannel = socketChannel;
                socketChannel.configureBlocking(false);
                //這里沒有注冊任何事件到selector上,只是獲取指定的選擇鍵
                selectionKey = socketChannel.register(selector, 0);
                selectionKey.attach(this);
                //通過selectionKey注冊可讀事件
                selectionKey.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void handlerSelectKey() {
            try {
                String currentTime = "";
                if (selectionKey.isValid()) {
                    if (selectionKey.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int readBytes = socketChannel.read(readBuffer);
                        if (readBytes > 0) {
                            readBuffer.flip();
                            byte[] bytes = new byte[readBuffer.remaining()];
                            readBuffer.get(bytes);
                            String body = new String(bytes, "UTF-8");
                            log.info("the time server receive order:{}", body);
                        } else if (readBytes < 0) {//如果讀取不到數據,則表示SocketChannel關閉,直接返回
                            socketChannel.close();
                            return;
                        }
                        selectionKey.interestOps(SelectionKey.OP_WRITE);
                    } else if (selectionKey.isWritable()) {
                        currentTime = LocalDateTime.now().toString();
                        log.info("send message 2 client ,current time is : {}", currentTime);
                        doWrite(socketChannel, currentTime);
                        selectionKey.interestOps(SelectionKey.OP_READ);
                    }
                    selector.wakeup();
                }
            } catch (Exception ex) {
                selectionKey.cancel();
                return;
            }
        }
    
        /**
         * 通過緩沖區,往channel中寫入數據
         *
         * @param socketChannel
         * @param response
         * @throws IOException
         */
        private void doWrite(SocketChannel socketChannel, String response) throws IOException {
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
            }
        }
    
        @Override
        public void run() {
            handlerSelectKey();
        }
    }
    

    3、連接建立事件的處理器

    /**
     * autor:liman
     * createtime:2020/9/9
     * comment: 多線程的Acceptor的處理器
     */
    @Slf4j
    public class MultiAcceptorHandler implements IMultiHandler {
    
        private Selector[] selectors;
        private ServerSocketChannel serverSocketChannel;
        private int selectorCount;
        private AtomicInteger selectorIndex = new AtomicInteger(0);
        private AtomicInteger totalConnectCount = new AtomicInteger(0);
    
        static ExecutorService pool = Executors.newFixedThreadPool(4);
    
        public MultiAcceptorHandler(Selector[] selectors, ServerSocketChannel serverSocketChannel,int selectorCount) {
            this.selectors = selectors;
            this.serverSocketChannel = serverSocketChannel;
            this.selectorCount = selectorCount;
        }
    
        @Override
        public void handlerSelectKey() {
            //selector的索引處理,如果已經達到了上線,歸零。
            //這里也可以用隨機的方式,隨機獲取一個選擇器都可以
            if(selectorIndex.incrementAndGet() == selectorCount){
                selectorIndex.set(0);
            }
            try{
                SocketChannel socketChannel = serverSocketChannel.accept();
                if(socketChannel!=null){
                    log.info("總共建立的連接事件個數:{}",totalConnectCount.incrementAndGet());
                    //這里將IO讀寫處理的Handler交給線程池去處理
                    MultiKeyEventHandler eventHandler = new MultiKeyEventHandler(this.selectors[selectorIndex.get()], socketChannel);
                    pool.execute(eventHandler);
                }
            }catch (Exception e){
                log.error("事件分發異常,異常信息為:{}",e);
            }
        }
    }
    

    4、服務端主要入口代碼,與單線程相比,主要在于啟動事件掃描線程

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:單線程reactor 模式的客戶端
     */
    @Slf4j
    public class TimeMulitReactorServer {
    
        private static final String host = "127.0.0.1";
        private static final int port = 10229;
        private static final int selectorCount = 2;
    
        public static void main(String[] args) {
            //啟動的時候,將服務端的ServerSocketChannel注冊到Selector上,并綁定連接事件的處理方式
            try {
                Selector[] selectors = new Selector[selectorCount];
                for(int i = 0;i<selectors.length;i++){
                    selectors[i]=Selector.open();
                }
                //開啟Reactor線程,可以先開啟子線程,反正這個時候也沒有注冊事件需要分配,讓其空載就行
                //后面的連接事件綁定到指定的處理器上,相應的線程會自動輪詢處理
                for(int i = 0;i<selectors.length;i++) {
                    new Thread(new MultiDispatchThread(selectors[i]),"dispatchThread_"+i).start();
                }
    
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                InetSocketAddress inetAddress =
                        new InetSocketAddress(host,
                                port);
                serverSocketChannel.bind(inetAddress);
    
                //這里隨便注冊一個selector都可以
                int index = new Random().nextInt(selectorCount);
                SelectionKey selectionKey = serverSocketChannel.register(selectors[index], SelectionKey.OP_ACCEPT);
                //綁定連接事件的處理Handler
                selectionKey.attach( new MultiAcceptorHandler(selectors, serverSocketChannel, selectorCount));
            } catch (Exception e) {
                log.error("連接建立出現異常,異常信息為:{}",e);
            }
        }
    }
    

    其他客戶端的改造

    為了更好的測試,客戶端需要啟動多個線程,最好引入線程池,具體改造如下

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:客戶端代碼,其中TimeClientHandler不用變,只是開啟線程的代碼引入線程池
     */
    @Slf4j
    public class TimeMultiReactorClient {
    
        static ExecutorService pool = Executors.newFixedThreadPool(4);
    
        public static void main(String[] args) throws InterruptedException {
            int port = 10229;
            for(int i=0;i<100;i++) {
                Thread thread = new Thread(new TimeClientHandler("127.0.0.1", port), "TimeClientThread-" + i);
                pool.execute(thread);
            }
        }
    }
    

    代碼均能正常運行,這里還是按照俗套寫法,附上一張多線程reactor的圖片,應該都能輕松找到對應的組件
    在這里插入圖片描述

    總結

    在調試過程中,客戶端代碼沒有改造成線程池之前,直接開啟了100個線程,結果調試過程發現服務端只接收到了五十多個線程就阻塞,以為是服務端的問題,后來發現客戶端只發送了五十多個請求就阻塞,后來調整了線程棧的JVM參數,才發現問題,后來直接使用線程池,客戶端才能正常發送100個請求。

    由于關于多線程線程池等問題,我不是很熟悉,因此耗費較久,整體多線程Reactor模式,幾乎在結合了《Netty、Redis、Zookeeper高并發實戰》和《Netty權威指南》兩本書之后,綜合整出了這個實例,《Netty、Redis、Zookeeper高并發實戰》給出過一個完整的實例,但個人覺得其實例部分普通處理的Handler與線程的run方法雜糅在一塊,理解起來較為晦澀,結合《Netty權威指南》的查詢時間的實例,綜合弄出了上述實例,只需要明確一點,連接事件建立的處理與普通IO的處理會交給不同的處理器進行處理。這兩種事件的處理都可以注冊在Selector上,在單線程模式下兩種事件的處理是注冊在同一個selector上,而多線程模式下是隨機的。

    版權聲明:本文為liman65727原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/liman65727/article/details/108560487

    智能推薦

    python拾遺(二)

    21、python list 列表中可以用的swap方法 mylist[i], mylist[j] = mylist[j], mylist[i] 這樣就可以swap兩個元素了 還有 a ^= b b ^= a a ^= b 22、python取整及保留小數 23、Python拷貝(深拷貝deepcopy與淺拷貝copy) Python中的對象之間賦值時是按引用傳遞的,如果需要拷貝對象,需要使用標準...

    Python拾遺(一)

    python扔在一邊很久了,最近剛好有幾天閑,買了本《Head First Python》復習一下。 跟python結緣是因為教C語言的王青老師推薦我們邊學C邊學python,記得說是C結合腳本語言會發揮出比較大的威力,還推薦了一門Coursera上的公開課,名字好像叫how to program,當時只是聽說,因為當時各種狀態,并沒有學。 后來用一個暑假學《learn python the ha...

    C語言拾遺

    short, long short 和 long 用于修飾整型,和 int 一起使用,但使用時 int 關鍵字可以省略。 short, int, long 在不同的機器架構下占用的長度不同,但一般遵循以下限制:short長度小于等于int,int長度小于等于long,一般short長度至少需要16位,long長度至少需要32位。 extern 引用全局變量(外部變量)時需要先使用 extern 關...

    【Practical】Dropout拾遺

    文章目錄 早期:Vanilla-Dropout. 現代:Inverted-Dropout. 模型集成角度理解. Dropout & L2-norm. Dropout & GD. 論文資源. 早期:Vanilla-Dropout. 【Dropout思想提出者Hinton在2014年論文中敘述的版本】 訓練過程中獨立地以概率 p p p 保留下每一個神經元,注意這里的 p p p 代表...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    猜你喜歡

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    精品国产乱码久久久久久蜜桃不卡