• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • 從零開始學Netty (一)-- Linux網絡IO模型 及 Java實現BIO、NIO、AIO

    標簽: 從零開始學Netty  netty  nio  aio

    下面準備做一個Netty的系列教程,適合初次接觸Netty或網絡通訊的同學了解閱讀。本文主要介紹Linux網絡IO模型、BIO、NIO、AIO及代碼實現。至于更基礎的OSI、TCP/IP、HTTP等,相信大家都不陌生就不一一贅述了。由于Netty5停止維護了,所以后面所涉及的Netty都是指Netty4這個版本,故也不支持AIO。

    Linux網絡IO模型

    一般服務器都運行在Linux環境下,故這里也只討論Linux環境下的。

    阻塞I/O模型

    應用程序調用一個IO函數,導致應用程序阻塞,等待數據準備好。 如果數據沒有準備好,一直等待。當數據準備好了,從內核拷貝到用戶空間,IO函數返回成功指示。當調用recv()函數時,系統首先查是否有準備好的數據。如果數據沒有準備好,那么系統就處于等待狀態。當數據準備好后,將數據從系統緩沖區復制到用戶空間,然后該函數返回。在套接應用程序中,當調用recv()函數時,未必用戶空間就已經存在數據,那么此時recv()函數就會處于等待狀態。

     

     

     

     

    非阻塞IO模型 

    當所請求的I/O操作無法完成時,不要將進程睡眠,而是返回一個錯誤。這樣我們的I/O操作函數將不斷的測試數據是否已經準備好,如果沒有準備好,繼續測試,直到數據準備好為止。在這個不斷測試的過程中,會大量的占用CPU的時間。所以這個模型不被推薦。
       

    IO復用模型

    I/O復用模型會用到select、poll、epoll函數,這幾個函數也會使進程阻塞,但是和阻塞I/O所不同的的,這兩個函數可以同時阻塞多個I/O操作。而且可以同時對多個讀操作,多個寫操作的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操作函數。當用戶進程調用了select,那么整個進程會被block;而同時,kernel會“監視”所有select負責的socket;當任何一個socket中的數據準備好了,select就會返回。這個時候,用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。


    這個圖和blocking IO的圖其實并沒有太大的不同,事實上還更差一些。因為這里需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在于它可以同時處理多個connection。(select/epoll的優勢并不是對于單個連接能處理得更快,而是在于能處理更多的連接。)

    信號驅動IO

    首先我們允許套接口進行信號驅動I/O,并安裝一個信號處理函數,進程繼續運行并不阻塞。當數據準備好時,進程會收到一個SIGIO信號,可以在信號處理函數中調用I/O操作函數處理數據。

    異步IO模型

     

    當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的部件在完成后,通過狀態、通知和回調來通知調用者的輸入輸出操作。

     

    JDK實現的BIO、NIO、AIO

    BIO(Blocking I/O):同步阻塞IO模型,數據的讀取寫入必須阻塞在一個線程內等待其完成。
    NIO(New I/O):IO復用模型,同時支持阻塞與非阻塞模式。這里我們以其同步非阻塞I/O模式來說明,服務端與客戶端通過Channel通信,NIO在Channel上讀寫,Channel被注冊到Selector多路復用器上。Selector通過一個線程輪詢這些Channel,找到已經就緒的Channel。
    AIO(Asynchronous I/O):異步阻塞IO模型。無需一個線程去輪詢所有IO操作的狀態改變,在相應的狀態改變后,系統會通知對應的線程來處理。

    BIO

    該模該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發訪問量增加后,服務端的線程個數和客戶端并發訪問數呈1:1的正比關系,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹后,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死掉了。
    為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程,實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。
    我們知道,如果使用CachedThreadPool線程池,其實除了能自動幫我們管理線程(復用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

    服務端

    public class BioServer {
    
        private static int DEFAULT_PORT = 8888;
        private static ServerSocket server;
    
        //線程池,處理每個客戶端的請求
        private static ExecutorService executorService = 
                Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private static void main(String[] args) throws IOException{
            try{
                server = new ServerSocket(DEFAULT_PORT);
                System.out.println("ServerSocket start,port:" + DEFAULT_PORT);
                while(true){
                    Socket socket= server.accept();
                    System.out.println("==========" );
                    //當有新的客戶端接入時,打包成一個任務,投入線程池
                    executorService.execute(new BioServerHandler(socket));
                }
            }finally{
                if(server!=null){
                    server.close();
                }
            }
        }
    }
    public class BioServerHandler implements Runnable{
        private Socket socket;
        public BioServerHandler(Socket socket) {
            this.socket = socket;
        }
        public void run() {
            try(//負責socket讀寫的輸出、輸入流
                BufferedReader in = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream(),
                        true)){
                String message;
                String result;
                //通過輸入流讀取客戶端傳輸的數據
                //如果已經讀到輸入流尾部,返回null,退出循環
                //如果得到非空值,就將結果進行業務處理
                while((message = in.readLine())!=null){
                    System.out.println("Server accept message:"+message);
                    result = Ch01Const.response(message);
                    //將業務結果通過輸出流返回給客戶端
                    out.println(result);
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                if(socket != null){
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    socket = null;
                }
            }
        }
    }

    客戶端

    public class BioClient {
    
        private static String DEFAULT_SERVER_IP = "127.0.0.1";
    
        private static int DEFAULT_PORT = 8888;
        
        public static void main(String[] args) throws InterruptedException,
                IOException {
            Socket socket =  new Socket(DEFAULT_SERVER_IP, DEFAULT_PORT);
            System.out.println("input:");
            
            //啟動讀取服務端輸出數據的線程
            new ReadMsg(socket).start();
            PrintWriter pw = null;
            
            //允許客戶端在控制臺輸入數據,然后送往服務器
            while(true){
                pw = new PrintWriter(socket.getOutputStream());
                pw.println(new Scanner(System.in).next());
                pw.flush();
            }
        }
    
        //讀取服務端輸出數據的線程
        private static class ReadMsg extends Thread {
            Socket socket;
    
            public ReadMsg(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
                //負責socket讀寫的輸入流
                try (BufferedReader br = new BufferedReader(
                        new InputStreamReader(socket.getInputStream()))) {
                    String line = null;
                    //通過輸入流讀取服務端傳輸的數據
                    //如果已經讀到輸入流尾部,返回null,退出循環
                    //如果得到非空值,就將結果進行業務處理
                    while ((line = br.readLine()) != null) {
                        System.out.printf("%s\n", line);
                    }
                } catch (SocketException e) {
                    System.out.printf("%s\n", "connection closed");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    NIO

    NIO主要有三大核心部分:Channel(通道),Buffer(緩沖區), Selector(選擇器)。傳統IO是基于字節流和字符流進行操作(基于流),而NIO基于Channel和Buffer(緩沖區)進行操作,數據總是從通道讀取到緩沖區中,或者從緩沖區寫入到通道中。Selector(選擇區)用于監聽多個通道的事件(比如:連接打開,數據到達)。因此,單個線程可以監聽多個數據通道。

    • Buffer:Buffer(緩沖區)是一個用于存儲特定基本類型數據的容器。除了boolean外,其余每種基本類型都有一個對應的buffer類。Buffer類的子類有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer 。
    • Channel:Channel(通道)表示到實體,如硬件設備、文件、網絡套接字或可以執行一個或多個不同 I/O 操作(如讀取或寫入)的程序組件的開放的連接。Channel接口的常用實現類有FileChannel(對應文件IO)、DatagramChannel(對應UDP)、SocketChannel和ServerSocketChannel(對應TCP的客戶端和服務器端)。Channel和IO中的Stream(流)是差不多一個等級的。只不過Stream是單向的,譬如:InputStream, OutputStream.而Channel是雙向的,既可以用來進行讀操作,又可以用來進行寫操作。
    • Selector:Selector(選擇器)用于監聽多個通道的事件(比如:連接打開,數據到達)。因此,單個的線程可以監聽多個數據通道。即用選擇器,借助單一線程,就可對數量龐大的活動I/O通道實施監控和維護。

    操作類型SelectionKey

    NIO共定義了四種操作類型:OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT(定義在SelectionKey中),分別對應讀、寫、請求連接、接受連接等網絡Socket操作。ServerSocketChannel和SocketChannel可以注冊自己感興趣的操作類型,當對應操作類型的就緒條件滿足時OS會通知channel,下表描述各種Channel允許注冊的操作類型,Y表示允許注冊,N表示不允許注冊,其中服務器SocketChannel指由服務器ServerSocketChannel.accept()返回的對象。

     

     

    OP_READ

    OP_WRITE

    OP_CONNECT

    OP_ACCEPT

    服務器ServerSocketChannel

    N

    N

    N

    Y

    服務器SocketChannel

    Y

    Y

    N

    N

    客戶端SocketChannel

    Y

    Y

    Y

    N

     

    服務器啟動ServerSocketChannel,關注OP_ACCEPT事件。
    客戶端啟動SocketChannel,連接服務器,關注OP_CONNECT事件。
    服務器接受連接,啟動一個服務器的SocketChannel,這個SocketChannel可以關注OP_READ、OP_WRITE事件,一般連接建立后會直接關注OP_READ事件。
    客戶端這邊的客戶端SocketChannel發現連接建立后,可以關注OP_READ、OP_WRITE事件,一般是需要客戶端需要發送數據了才關注OP_READ事件。
    連接建立后客戶端與服務器端開始相互發送消息(讀寫),根據實際情況來關注OP_READ、OP_WRITE事件。

    服務端

    public class NioServer {
    
        private static int DEFAULT_PORT = 8888;
    
        private static NioServerHandle nioServerHandle;
    
        public static void main(String[] args){
            if(nioServerHandle !=null)
                nioServerHandle.stop();
            nioServerHandle = new NioServerHandle(DEFAULT_PORT);
            new Thread(nioServerHandle,"Server").start();
        }
    }
    

     服務端處理器

    public class NioServerHandle implements Runnable{
        private Selector selector;
        private ServerSocketChannel serverChannel;
        private volatile boolean started;
    
        /**
         * 構造方法
         * @param port 指定要監聽的端口號
         */
        public NioServerHandle(int port) {
    
            try {
                selector = Selector.open();
                serverChannel = ServerSocketChannel.open();
                serverChannel.configureBlocking(false);
                serverChannel.socket().bind(new InetSocketAddress(port));
                serverChannel.register(selector,SelectionKey.OP_ACCEPT);
                started = true;
                System.out.println("服務器已啟動,端口號:"+port);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        public void stop(){
            started = false;
        }
        @Override
        public void run() {
            //循環遍歷selector
            while(started){
                try{
                    //阻塞,只有當至少一個注冊的事件發生的時候才會繼續.
    				selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Throwable t){
                    t.printStackTrace();
                }
            }
            //selector關閉后會自動釋放里面管理的資源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
        }
    
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //處理新接入的請求消息
                if(key.isAcceptable()){
                    //獲得關心當前事件的channel
                    ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                    //通過ServerSocketChannel的accept創建SocketChannel實例
                    //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
                    SocketChannel sc = ssc.accept();
                    System.out.println("======socket channel 建立連接" );
                    //設置為非阻塞的
                    sc.configureBlocking(false);
                    //連接已經完成了,可以開始關心讀事件了
                    sc.register(selector,SelectionKey.OP_READ);
                }
                //讀消息
                if(key.isReadable()){
                    System.out.println("======socket channel 數據準備完成," +
                            "可以去讀==讀取=======");
                    SocketChannel sc = (SocketChannel) key.channel();
                    //創建ByteBuffer,并開辟一個1M的緩沖區
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //讀取請求碼流,返回讀取到的字節數
                    int readBytes = sc.read(buffer);
                    //讀取到字節,對字節進行編解碼
                    if(readBytes>0){
                        //將緩沖區當前的limit設置為position=0,
                        // 用于后續對緩沖區的讀取操作
                        buffer.flip();
                        //根據緩沖區可讀字節數創建字節數組
                        byte[] bytes = new byte[buffer.remaining()];
                        //將緩沖區可讀字節數組復制到新建的數組中
                        buffer.get(bytes);
                        String message = new String(bytes,"UTF-8");
                        System.out.println("服務器收到消息:" + message);
                        //處理數據
                        String result = Ch01Const.response(message) ;
                        //發送應答消息
                        doWrite(sc,result);
                    }
                    //鏈路已經關閉,釋放資源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
    
            }
        }
    
        //發送應答消息
        private void doWrite(SocketChannel channel,String response)
                throws IOException {
            //將消息編碼為字節數組
            byte[] bytes = response.getBytes();
            //根據數組容量創建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //將字節數組復制到緩沖區
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //發送緩沖區的字節數組
            channel.write(writeBuffer);
        }
    }

    客戶端

    public class NioClient {
        
        private static String DEFAULT_SERVER_IP = "127.0.0.1";
        private static int DEFAULT_PORT = 8888;
        
        private static NioClientHandle nioClientHandle;
        
        //向服務器發送消息
        public static boolean sendMsg(String msg) throws Exception{
            nioClientHandle.sendMsg(msg);
            return true;
        }
        
        public static void main(String[] args) throws Exception {
            if(nioClientHandle !=null)
                nioClientHandle.stop();
            nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP, DEFAULT_PORT);
            new Thread(nioClientHandle,"Client").start();
            Scanner scanner = new Scanner(System.in);
            while(NioClient.sendMsg(scanner.next()));
        }
    }

    客戶端處理器

    public class NioClientHandle implements Runnable{
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
    
        private volatile boolean started;
    
        public NioClientHandle(String ip, int port) {
            this.host = ip;
            this.port = port;
            try {
                //創建選擇器
                selector = Selector.open();
                //打開通道
                socketChannel = SocketChannel.open();
                //如果為 true,則此通道將被置于阻塞模式;
                // 如果為 false,則此通道將被置于非阻塞模式
                socketChannel.configureBlocking(false);
                started = true;
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        public void stop(){
            started = false;
        }
        @Override
        public void run() {
            try {
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            //循環遍歷selector
            while(started){
                try {
                    //阻塞,只有當至少一個注冊的事件發生的時候才會繼續
                    selector.select();
                    //獲取當前有哪些事件可以使用
                    Set<SelectionKey> keys = selector.selectedKeys();
                    //轉換為迭代器
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (IOException e) {
                            e.printStackTrace();
                            if(key!=null){
                                key.cancel();
                                if(key.channel()!=null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            //selector關閉后會自動釋放里面管理的資源
            if(selector!=null){
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        //具體的事件處理方法
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //獲得關心當前事件的channel
                SocketChannel sc = (SocketChannel)key.channel();
                if(key.isConnectable()){//連接事件
                    if(sc.finishConnect()){
                        socketChannel.register(selector,SelectionKey.OP_READ);
                    }
                    else{System.exit(1);}
                }
                //有數據可讀事件
                if(key.isReadable()){
                    //創建ByteBuffer,并開辟一個1M的緩沖區
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //讀取請求碼流,返回讀取到的字節數
                    int readBytes = sc.read(buffer);
                    //讀取到字節,對字節進行編解碼
                    if(readBytes>0){
                        //將緩沖區當前的limit設置為position,position=0,
                        // 用于后續對緩沖區的讀取操作
                        buffer.flip();
                        //根據緩沖區可讀字節數創建字節數組
                        byte[] bytes = new byte[buffer.remaining()];
                        //將緩沖區可讀字節數組復制到新建的數組中
                        buffer.get(bytes);
                        String result = new String(bytes,"UTF-8");
                        System.out.println("accept message:"+result);
                    }else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
    
        //發送消息
        private void doWrite(SocketChannel channel,String request)
                throws IOException {
            //將消息編碼為字節數組
            byte[] bytes = request.getBytes();
            //根據數組容量創建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //將字節數組復制到緩沖區
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //發送緩沖區的字節數組
            channel.write(writeBuffer);
        }
    
        private void doConnect() throws IOException {
            /*如果此通道處于非阻塞模式,
            則調用此方法將啟動非阻塞連接操作。
            如果立即建立連接,就像本地連接可能發生的那樣,則此方法返回true。
            否則,此方法返回false,
            稍后必須通過調用finishConnect方法完成連接操作。*/
            if(socketChannel.connect(new InetSocketAddress(host,port))){}
            else{
                //連接還未完成,所以注冊連接就緒事件,向selector表示關注這個事件
                socketChannel.register(selector,SelectionKey.OP_CONNECT);
            }
        }
    
        //寫數據對外暴露的API
        public void sendMsg(String msg) throws Exception{
            //socketChannel.register(selector,SelectionKey.OP_READ);
            doWrite(socketChannel,msg);
        }
    }

    AIO

    Java從JDK1.7開始支持AIO。核心類有AsynchronousSocketChannel、AsynchronousServerSocketChannel。核心接口CompletionHandler,需要自己實現對應的回調方法。
    異步的Channel調用accept()方法后,當前線程不會阻塞。接受來自客戶端請求,連接成功或失敗后,回調傳入的CompletionHandler的completed()或failed()實現。每次對Channel操作后都會傳入對應的CompletionHandler實現,等待結果的回調,實現異步IO。

    服務端

    public class AioServer {
        private static AioServerHandler serverHandle;
    
        public static void main(String[] args){
            if(serverHandle!=null)
                return;
            serverHandle = new AioServerHandler(Ch01Const.DEFAULT_PORT);
            new Thread(serverHandle,"Server").start();
        }
    }
    public class AioServerHandler implements Runnable {
    
        // 通過CountDownLatch阻塞當前線程,防止程序退出
        public CountDownLatch latch;
        /*進行異步通信的通道*/
        public AsynchronousServerSocketChannel channel;
    
        public AioServerHandler(int port) {
            try {
                channel = AsynchronousServerSocketChannel.open();
                channel.bind(new InetSocketAddress(port));
                System.out.println("Server is start,port:"+port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            latch = new CountDownLatch(1);
            // 需要實現了CompletionHandler接口的處理器處理和客戶端的連接操作
            channel.accept(this,new AioAcceptHandler());
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    處理用戶連接

    public class AioAcceptHandler
            implements CompletionHandler<AsynchronousSocketChannel,
            AioServerHandler> {
        @Override
        public void completed(AsynchronousSocketChannel channel,
                              AioServerHandler serverHandler) {
            // 重新注冊監聽,讓別的客戶端也可以連接
            serverHandler.channel.accept(serverHandler,this);
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            // 連接完成后,給Channel開啟一個讀事件,同時結束當前線程
            channel.read(readBuffer,readBuffer,
                    new AioReadHandler(channel));
        }
    
        @Override
        public void failed(Throwable exc, AioServerHandler serverHandler) {
            exc.printStackTrace();
            serverHandler.latch.countDown();
        }
    }

    處理讀

    public class AioReadHandler
            implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel channel;
        public AioReadHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        //讀取到消息后的處理
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            //如果條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就可以了
            if(result == -1) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return;
            }
            //flip操作
            attachment.flip();
            byte[] message = new byte[attachment.remaining()];
            attachment.get(message);
            try {
                System.out.println(result);
                String msg = new String(message,"UTF-8");
                System.out.println("server accept message:"+msg);
                String responseStr = Ch01Const.response(msg);
                //向客戶端發送消息
                doWrite(responseStr);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //發送消息
        private void doWrite(String result) {
            byte[] bytes = result.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            //異步寫數據
            channel.write(writeBuffer, writeBuffer,
                    new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    if(attachment.hasRemaining()){
                        channel.write(attachment,attachment,this);
                    }else{
                        //讀取客戶端傳回的數據
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        //異步讀數據
                        channel.read(readBuffer,readBuffer,
                                new AioReadHandler(channel));
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                this.channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

     

    客戶端代碼類似。可見AIO主要就是實現監聽、讀、寫等各種CompletionHandler,代碼較BIO、NIO更為復雜。

    由于Linux本身的AIO模型并不完善,包括AIO接收數據需要預先分配緩存, 而不是NIO那種需要接收時才需要分配緩存, 容易造成內存浪費。所以在我們實際工作中,AIO使用場景也比較少,包括Netty4也是基于NIO實現的。對于AIO,我們更需要理解這種設計模式,在處理一些其他的高并發場景時,可以借鑒這種思想,解決一些實際問題。

     

     

     

     

     

     

     

    版權聲明:本文為zwt122755527原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/zwt122755527/article/details/109242094

    智能推薦

    深入了解Netty【一】BIO、NIO、AIO簡單介紹

    引言 在Java中提供了三種IO模型:BIO、NIO、AIO,模型的選擇決定了程序通信的性能。 1.1、使用場景 BIO BIO適用于連接數比較小的應用,這種IO模型對服務器資源要求比較高。 NIO BIO適用于連接數目多、連接時間短的應用,比如聊天、彈幕、服務器間通訊等應用。 AIO AIO適用于連接數目多、連接時間長的應用,比如相冊服務器。 1.2、BIO 同步并阻塞模型,服務器會為每一個連接...

    Netty拾遺(一)——BIO、NIO和AIO簡介

    前言 本篇博客打算總結一下BIO,NIO和AIO的簡單區別,其中對于AIO不會重點介紹,只會簡單提及部分,不做實例演示。本文博客中的實例參照《Netty 權威指南》一書,關于BIO與NIO的理論描述,參照了《Netty、Redis、Zookeeper高并發實戰》一書。 IO基礎知識 底層IO的讀寫,均會調用底層的read和write兩大系統調用(不同系統中IO讀寫的系統調用名稱不同,但基本功能都差...

    從IO-BIO-NIO-AIO-到Netty

    文章目錄 IO 操作系統層面 IO的多路復用 epoll BIO NIO NIO單線程模型 NIO-reactor模式 AIO Netty 同步-異步-阻塞-非阻塞 IO 操作系統層面 一個應用程序進行IO時,需要系統內核的參與,發送syscall指令產生中斷。 發生中斷意味著需要操作系統介入,開展管理工作。由于操作系統的管理工作(比如切換線程、分配I/O設備等),需要使用特權指令,因此CPU要從...

    Netty一:Java bio nio

    Java支持的I/O模型 Java共支持3種網絡編程模型/IO模式:BIO、NIO、AIO JavaBIO:同步并阻塞(傳統阻塞型),服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷 JavaNIO:同步非阻塞,服務器實現模式為一個線程處理多個請求(連接),即客戶端發送的連接請求都會注冊到多路復用器上,多路復用...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    猜你喜歡

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    精品国产乱码久久久久久蜜桃不卡