• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty拾遺(一)——BIO、NIO和AIO簡介

    標簽: # Netty  netty

    前言

    本篇博客打算總結一下BIO,NIO和AIO的簡單區別,其中對于AIO不會重點介紹,只會簡單提及部分,不做實例演示。本文博客中的實例參照《Netty 權威指南》一書,關于BIO與NIO的理論描述,參照了《Netty、Redis、Zookeeper高并發實戰》一書。

    IO基礎知識

    底層IO的讀寫,均會調用底層的read和write兩大系統調用(不同系統中IO讀寫的系統調用名稱不同,但基本功能都差不多)。

    read——把數據從內核緩沖區復制到進程緩沖區(這里涉及一個概念,內核緩沖區和進程緩沖區,可以參看這篇博客: 用戶進程緩沖區和內核緩沖區)。

    例如:如果read操作是從一個socket中讀取數據,則分為如下兩個階段:

    第一個階段:等待數從網絡中達到網卡。當所等待的分組到達時,數據會從socket中復制到內核緩沖區中,這個操作由操作系統底層自己完成,對應用程序是無感知的。

    第二個階段:這個就是把數據從內核緩沖區復制到進程緩沖區。

    總的來說read操作,分為兩個步驟,第一個步驟就是內核等待外部文件的數據達到,如果數據完整之后,會自動復制到內核緩沖區。第二個步驟,就是數據從內核緩沖區復制到進程緩沖區。

    write——把數據從進程緩沖區讀到內核緩沖區。

    操作系統只有一個內核緩沖區,每個進程有自己獨立的緩沖區,這個就是進程緩沖區。基本的通信交互模型比較簡單,如下所示

    在這里插入圖片描述

    在正式介紹各個IO類型之前,還是先說一下阻塞IO和異步IO的區別

    阻塞IO與非阻塞IO

    阻塞IO,其實指的是應用程序需要內核的IO操作完成之后,才能開始處理用戶的操作。阻塞指的是用戶程序的執行狀態。在Java中,默認創建的Socket都是阻塞的。

    非阻塞IO,與阻塞IO相對而言,非阻塞IO指的是用戶空間的程序(應用程序)不需要等待內核IO操作完成,可以立即返回用戶空間執行用戶的操作,與此同時內核會立即返回用戶程序一個狀態。

    同步IO與異步IO

    關于同步IO與異步IO,《Netty、Redis、Zookeeper高并發實戰》一書中介紹的是應用程序與內核空間IO發起方式不同。但是我個人并不能理解這個,我個人認為,其實就是普通的同步與異步的概念,只是對象變成了應用程序與操作系統內核而已。

    同步阻塞IO——Blocking IO(BIO)

    默認情況下Socket是同步阻塞的,在阻塞模式中,Java應用程序從IO調用開始,直到系統調用返回,在這段時間內,Java進程都是阻塞的。直到返回成功,程序才能開始下一步操作。總之,阻塞IO的特點是,在內核進行IO執行的兩個階段,整個用戶線程都被阻塞了。

    在這里插入圖片描述

    如果單獨討論網絡編程,BIO絕對是每一個學習網絡編程的Helloworld程序,通過Socket與ServerSocket完成服務器與客戶端的消息通信,這里依舊參照《Netty 權威指南》一書,以一個簡單的時間回顯的實例來進行說明。

    具體代碼如下

    客戶端代碼

    客戶端的代碼相對而言比較簡單,無非就是實例化socket,根據socket獲取輸入輸出流,然后將命令寫入到輸出中,然后從服務端讀取客戶端輸入的數據。

    /**
     * autor:liman
     * createtime:2020/8/11
     * comment:Time client的客戶端
     */
    @Slf4j
    public class TimeClient {
    
        public static void main(String[] args) {
            int port = 8999;
            Socket socket = null;
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                //初始化socket
                socket = new Socket("127.0.0.1", port);
                //構建輸入輸出流
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                out = new PrintWriter(socket.getOutputStream(), true);
                //將查詢時間的命令寫入到客戶端輸出流中
                out.println("QUERY TIME");
                log.info("send time:{},send order to server succeed",LocalDateTime.now().toString());
                
                //讀取服務端的響應
                String responseMessage = in.readLine();
                log.info("server time is :{}", responseMessage);
            } catch (Exception e) {
    			log.error("error ,error message:{}",e);
            }finally {
                //一些關閉流的操作
                if(out!=null){
                    out.close();
                }
                if(in!=null){
                    try {
                        in.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(socket!=null){
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    服務端代碼

    服務端代碼相對復雜點,通過一個while(true)循環,不斷監聽來自客戶端的連接請求,然后在循環中處理客戶端的數據,并將返回數據通過流的形式返回。

    /**
     * autor:liman
     * createtime:2020/8/11
     * comment:
     * 參照Netty權威指南一書,BIO實例
     */
    @Slf4j
    public class TimeServer {
        public static void main(String[] args) {
            int port = 8999;
            ServerSocket serverSocket = null;
            try{
                serverSocket = new ServerSocket(port);
                log.info("Time server start in port:{}",port);
                Socket socket = null;
                
               	//服務端不斷循環,通過accept建立與客戶端的連接,整個過程是阻塞的。
    			while (true) {
                    socket = serverSocket.accept();//建立連接
                    //構建輸入輸出流
                    BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                    
                    String currentTime = null;
                    String body = null;
                    body = in.readLine();
                    if (body == null)//如果沒有收到客戶端的請求,則退出當前循環。
                        break;
                    log.info("this time server receive order:{}", body);
                    if ("QUERY TIME".equalsIgnoreCase(body)) {
                        Thread.sleep(10000);//睡眠10秒,模擬客戶端的請求。
                        currentTime = LocalDateTime.now().toString();
                    } else {
                        currentTime = "BAD ORDER";
                    }
                    out.println(currentTime);
                }
    
            }catch (Exception e){
                log.error("服務端讀取信息異常,異常信息為:{}",e);
            }finally {
                
                //關閉serverSocket,這里省略其他流的關閉操作。
                if(serverSocket!=null){
                    log.info("the time server close!");
                    try {
                        serverSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    運行結果

    啟動一個服務端,然后啟動兩個客戶端,運行日志如下:

    服務端正常收到兩次請求

    在這里插入圖片描述

    第一個啟動的客戶端日志為

    在這里插入圖片描述

    第二個啟動的客戶端日志為

    在這里插入圖片描述

    從兩個客戶端日志可以看出,第二個客戶端處理耗時接近20秒,而我們服務端每次請求會休眠10秒,可以看出,每次服務端處理客戶端請求是阻塞式的,這就是傳說中的阻塞式IO,也就稱為BIO。

    同步非阻塞IO(NIO)

    這里需要說明一下,同步非阻塞IO,這里可以簡稱為NIO,但是并不對應于Java中的NIO,雖然他們的英文縮寫是一樣的,但是并不是同一個事情,Java中的NIO是基于另一種模型——IO多路復用模型。

    Socket連接模式是阻塞模式,在Linux系統下,可以通過設置將Socket變成非阻塞模式。

    在內核緩沖區中沒有數據的情況下,系統調用會立即返回,返回一個調用失敗的信息。

    在內核緩沖區中有數據的情況下,這個時候,客戶端是阻塞的,直到獲取數據的系統調用返回成功,應用程序才會開始處理內核的數據。

    在這里插入圖片描述

    IO多路復用(IO Multiplexing)

    這種模式引入了一種新的系統滴啊用,查詢IO的就緒狀態,在Linux系統中,對應的系統調用為select/epoll系統調用,通過這個系統調用,一個進程可以監視多個文件描述符。一旦其中之一變成可讀,則會將狀態返回給應用程序。通過使用select/epoll系統調用,單個應用程序的線程,可以不斷的輪詢成百上千的socket連接。

    在這里插入圖片描述

    Java中的NIO才對應了該IO模型,關于Java NIO,其實有三個關鍵的主鍵——selector,buffer,channel的部分,其中最為關鍵的其實是buffer的讀寫部分,這一部分在之前的博客中有過總結,可以參考這篇博客——NIO的三個關鍵組件

    客戶端代碼

    啟動類

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:啟動類
     */
    @Slf4j
    public class TimeClient {
        public static void main(String[] args) {
            int port = 8999;
            new Thread(new TimeClientHandler("127.0.0.1",port),"TimeClientThread-001").start();
        }
    }
    

    真正的handler處理類

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:
     */
    @Slf4j
    public class TimeClientHandler implements Runnable {
    
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean stop;
    
        //構造函數中需要實例化SocketChannel,只有實例化完成的SocketChannel才能注冊到Selector上。
        public TimeClientHandler(String host, int port) {
            this.host = host;
            this.port = port;
            try {
                selector = Selector.open();
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
            } catch (Exception e) {
                log.error("客戶端初始化異常,異常信息為:{}",e);
            }
        }
    
        @Override
        public void run() {
            try {
                //先建立連接,沒有建立連接,一切都免談,畢竟這里是基于TCP的協議。
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            //如果線程沒有被中斷,則一直輪詢去遍歷在selector上注冊的事件
            //然后根據不同的事件類型調用不同的處理邏輯。
            while(!stop){
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> selectorKeyIterator = selectionKeys.iterator();
                    SelectionKey key = null;
                    while(selectorKeyIterator.hasNext()){
                        key = selectorKeyIterator.next();
                        try{
                            handleInput(key);
                        }catch (Exception e){
                            if(key!=null){
                                key.cancel();
                                if(key.channel()!=null){
                                    key.channel().close();
                                }
                            }
                        }
                        selectorKeyIterator.remove();
                    }
    
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            if(selector!=null){
                try{
                    selector.close();
                }catch (Exception e){
                    log.error("流關閉異常,異常信息為:{}",e);
                }
            }
        }
    
        //selector上的注冊事件處理邏輯。
        private void handleInput(SelectionKey key) throws IOException {
            //
            if (key.isValid()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if (key.isConnectable()) {//如果連接建立
                    if (socketChannel.finishConnect()) {
                        //注冊可讀事件
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        log.info("client connect to server ,client time is {}", LocalDateTime.now().toString());
                        //channel中寫入數據
                        doWrite(socketChannel);
                    } else {
                        System.exit(1);
                    }
                }
    
                if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = socketChannel.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes,"UTF-8");
                        log.info("receive server message,Now is : {}",body);
                        this.stop = true;
                    }else if(readBytes < 0){
                        key.cancel();
                    }
                    socketChannel.close();
                }
            }
        }
    
        //與客戶端建立連接,連接成功之后,需要將SocketChannel的可讀事件注冊到selector上。
        private void doConnect() throws IOException {
            //如果連接建立,則注冊可讀事件
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else {//如果連接未建立,則需要注冊連接事件。
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            }
        }
    
        //往channel中寫入數據
        private void doWrite(SocketChannel socketChannel) throws IOException {
            byte[] req = "QUERY TIME".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
            if (!writeBuffer.hasRemaining()) {
                log.info("send order 2 server succeed.");
            }
        }
    }
    

    服務端代碼

    服務端入口代碼

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:時間服務器
     */
    @Slf4j
    public class TimeServer {
    
        public static void main(String[] args) {
            int port = 8999;
            MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
            new Thread(timeServer,"NIO-Time-HandlerServer-001").start();
        }
    }
    

    服務端業務處理代碼,這個代碼和客戶端的業務處理差不多。

    /**
     * autor:liman
     * createtime:2020/8/12
     * comment:
     */
    @Slf4j
    public class MultiplexerTimeServer implements Runnable {
    
        private Selector selector;
        private ServerSocketChannel serverSocketChannel;
        private volatile boolean stop;
    
        public MultiplexerTimeServer(int port) {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                //注冊到選擇器的通道必須是非阻塞模式
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
                //將serverSocketChannel注冊到selector上
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                log.info("The time server is start in port : {}", port);
            } catch (IOException e) {
                log.error("服務啟動出行異常,異常信息為:{}", e);
                System.exit(1);
            }
        }
    
        public void stop() {
            this.stop = true;
        }
    
        @Override
        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keysIterator = selectionKeys.iterator();
                    SelectionKey key = null;
                    while (keysIterator.hasNext()) {
                        key = keysIterator.next();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            log.error("通道出現異常,異常信息為:{}",e);
                            if(key!=null){
                                key.cancel();
                                if(key.channel()!=null){
                                    key.channel().close();
                                }
                            }
                        }
                        keysIterator.remove();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 處理Selector上的注冊事件
         * @param key
         * @throws IOException
         */
        private void handleInput(SelectionKey key) throws IOException {
            String currentTime = null;
            if (key.isValid()) {
    
                if(key.isAcceptable()){
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                    //服務端的SocketChannel需要通過serverSocketChannel.accept()來獲取。
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }
    
                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = socketChannel.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        log.info("the time server receive order:{}", body);
                        if ("QUERY TIME".equalsIgnoreCase(body)) {
                            currentTime = LocalDateTime.now().toString();
                        } else {
                            currentTime = "BAD ORDER";
                        }
                        doWrite(socketChannel, currentTime);
                    } else if (readBytes < 0) {
                        key.channel();
    
                    }
                    socketChannel.close();
                }
            }
        }
    
        //往channel中寫入buffer
        private void doWrite(SocketChannel socketChannel, String response) throws IOException {
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
            }
        }
    }
    

    上述代碼運行結束,不會出現BIO中阻塞的問題

    AIO

    AIO操作是對用戶程序最友好的IO操作,用戶線程在通過系統調用,向內核注冊某個IO操作,內核在整個IO操作完成之后,主動通知用戶程序。整個過程用戶程序無需關注任何IO的狀態,只需要等待操作系統內核告知IO處理完成即可。

    總結

    Netty依舊使用的是IO多路復用的模型,在5.0左右的版本中曾經打算采用AIO,但是后來發現AIO可維護性并沒有想象中的優秀,于是放棄了AIO的方式,目前Netty依舊采用的是IO的多路復用模型。因此關于AIO本篇博客也沒有做過多的總結。

    版權聲明:本文為liman65727原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/liman65727/article/details/108431831

    智能推薦

    Netty拾遺(八)——編碼與解碼

    文章目錄 前言 解碼器 ByteToMessageDecoder解碼器 實例 MessageToMessageDecoder解碼器 多說一句 編碼器 MessageToByteEncoder編碼器 MessageToMessageEncoder編碼器 總結 前言 走到現在,其實應該有一個意識了,針對數據接收端,需要Netty從底層網絡接口中讀取數據,然后將數據讀取到ByteBuf,這個時候依舊是二...

    BIO、NIO、AIO系列二:Netty

    一、概述 Netty是一個Java的開源框架。提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 Netty是一個NIO客戶端,服務端框架。允許快速簡單的開發網絡應用程序。例如:服務端和客戶端之間的協議,它簡化了網絡編程規范。 二、NIO開發的問題 1、NIO類庫和API復雜,使用麻煩。 2、需要具備Java多線程編程能力(涉及到Reactor模式...

    Netty序章之BIO NIO AIO演變

    Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司都在使用的技術。更是面試的加分項。Netty并非橫空出世,它是在BIO,NIO,AIO演變中的產物,是一種NIO框架。而BIO,NIO,AIO更是筆試中要考,面試中要問的技術。也是一個很好的加分項,加分就是加工資,你還在等什么?本章帶你細細品味三者的不...

    Netty系列3-BIO、AIO、NIO

    客戶端和服務端通信本質上就是服務端監聽端口,客戶端發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字socket進行通信。 根據通信實現方式的不同又分為BIO、AIO、NIO三種。 1.BIO BIO是同步阻塞模型。通常由一個Acceptor線程監聽客戶端的連接,接收到連接請求后為每個客戶端都創建一個新線程進行處理,最后將響應通過輸出流返回給客戶端,線程銷毀。 BIO最大的缺點...

    Netty實戰一 | Java BIO, NIO 及Netty簡介

    免責聲明:本人最近在研讀《Netty實戰》書籍,對于里面內容頗感興趣,本文旨在于技術學習交流,不存在盈利性目的。 Java 網絡編程 BIO:block input output 早期的網絡編程開發人員,需要花費大量的時間去學習復雜的 C 語言套接字庫,去處理它們在不同的操作系統上出現的古怪問題。早期的 Java API(java.net)只支持由本地系統套接字庫提供的所謂的阻塞函數。Java創建...

    猜你喜歡

    BIO、NIO、AIO系列一:NIO

    一、幾個基本概念 1.同步、異步、阻塞、非阻塞 同步:用戶觸發IO操作,你發起了請求就得等著對方給你返回結果,你不能走,針對調用方的,你發起了請求你等 異步:觸發觸發了IO操作,即發起了請求以后可以做自己的事,等處理完以后會給你返回處理完成的標志,針對調用方的,你發起了請求你不等 阻塞:你調用我,我試圖對文件進行讀寫的時候發現沒有可讀寫的文件,我的程序就會進入等待狀態,等可以讀寫了,我處理完給你返...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    精品国产乱码久久久久久蜜桃不卡