網絡編程之BIO、NIO、AIO
TCP直連Socket與ServerSocket通信
Server.java
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
final static int PROT = 8765;
public static void main(String[] args) {
ServerSocket server = null;
try {
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
//進行阻塞
Socket socket = server.accept();
//新建一個線程執行客戶端的任務
new Thread(new ServerHandler(socket)).start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(server != null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}
}
ServerHandler.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class ServerHandler implements Runnable{
private Socket socket ;
public ServerHandler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
/*Parameters:
out An output stream
autoFlush A boolean; if true, the println, printf, or format methods will flush the output buffer
*/
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while(true){
body = in.readLine();
if(body == null) break;
System.out.println("Server :" + body);
out.println("服務器端返回給客戶端的響應數據.");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
}
}
}
啟動Server
Client.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT = 8765;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//向服務器端發送數據
out.println("客戶端發送的的請求測試數據");
String response = in.readLine();
System.out.println("Client: " + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
}
}
}
Eclispe的client、server輸出如下:
每次建立連接都要新啟動一個線程,而線程會占用一定的資源。如果Client與Server建立的連接很多,就會創建很多的線程,ServerSocket所在的機器可能會出現資源逐步趨于耗盡的問題。
TCP建立連接三次握手:
第一次握手:建立連接時,客戶端發送syn包(syn=j)到服務器,并進入SYN_SENT狀態,等待服務器確認;SYN:同步序列編號(Synchronize Sequence Numbers)。
第二次握手:服務器收到syn包,必須確認客戶的SYN(ack=j+1),同時自己也發送一個SYN包(syn=k),即SYN+ACK包,此時服務器進入SYN_RECV狀態;
第三次握手:客戶端收到服務器的SYN+ACK包,向服務器發送確認包ACK(ack=k+1),此包發送完畢,客戶端和服務器進入ESTABLISHED(TCP連接成功)狀態,完成三次握手。
三次握手完成后,客戶端與服務器開始傳送數據。
網絡編程的基本模型是Client/Server模型,即Client進程與Server進程直接進行相互通信。服務器端綁定某個端口并進行監聽,而客戶端通過指定IP、端口號向指定的Server發出連接請求,通過三次握手建立連接,若連接成功則客戶端與服務器端即可進行相互通信。
BIO同步阻塞
在JDK1.5之前,采用偽異步的方式避免Server Socket建立過多的線程來處理客戶端的請求,其內部維護著一個線程池,將客戶端請求建立的Socket封裝成一個任務Task對象(任務Task類實現Runnable接口),把任務對象交給線程池處理,并配置相應的阻塞隊列BlockingQueue用于緩沖任務對象。在線程池中可以設置,用于處理Client建立連接Socket的線程池最大線程數,這樣就避免了Server Socket端無限制的創建子線程去處理每一個Client建立的連接而導致系統資源耗盡,機器宕機的問題。
Client.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT =8765;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("Client request");
String response = in.readLine();
System.out.println("Client:" + response);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
Server.java
在Server Socket端使用自定義線程池HandlerExecutorPool,參數50是線程池的最大線程數,100為ArrayBlockingQueue排隊等待的緩沖隊列長度。針對監聽并建立連接的Socket,經過自定義的ServerHandler包裝后,交給自定義線程池進行處理,Server Socket繼續處于accept狀態,監聽來自Client的連接請求。
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
final static int PORT = 8765;
public static void main(String[] args) {
ServerSocket server = null;
BufferedReader in = null;
PrintWriter out = null;
try {
server = new ServerSocket(PORT);
System.out.println("server start");
Socket socket = null;
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
while(true){
socket = server.accept();
executorPool.execute(new ServerHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(server != null){
try {
server.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
server = null;
}
}
}
HandlerExecutorPool.java
由于在Server Socket中傳遞的參數maxPoolSize=50, queueSize=100。創建的ThreadPoolExecutor對象初始化線程池時就創建的線程數為Runtime.getRuntime().availableProcessors()即JVM可用的處理器數,線程池的最大線程數為50,空閑時間為120秒,即線程池中的某個線程若空閑時間超過120秒仍未有新的任務分配給這個線程,則這個線程會停止,其占用的資源會被回收。ArrayBlockingQueue是一個基于數組的阻塞隊列,是一個有界隊列,其內部維護著一個定長數組,以便緩沖隊列中數據對象,隊列的讀寫未實現分離,因此數據的生產和消費不能完全并行。由于queueSize=100,則該有界隊列的長度為100。
在下面代碼中,使用的是ArrayBlockingQueue有界隊列,當有新的Socket交給線程池處理時,若線程池的實際線程數小于Runtime.getRuntime().availableProcessors()時,則優先創建線程;若當前線程數大于Runtime.getRuntime().availableProcessors()則將任務加入到ArrayBlockingQueue隊列中。在隊列已滿情況下,若在線程池的總線程數不大于50的前提下,創建新的線程處理當前這個新任務;若線程池的線程數已達到50個,則對新任務執行拒絕策略。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
ServerHandler.java
在上述Server.java中存在代碼executorPool.execute(new ServerHandler(socket));,將經過ServerHandler包裝的Socket交給線程池中線程處理。ServerHandler實現了Runnable接口,在run()方法中獲取Client端傳遞給來的數據流,經過處理轉換后輸出,并使用out.println()方法給Client回傳Server Socket端的響應信息。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler (Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while(true){
body = in.readLine();
if(body == null) break;
System.out.println("Server:" + body);
out.println("Server response");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
先啟動Server.java
再啟動Client.java,此時Server對應的console內容如下
Client對應的console內容如下
NIO同步非阻塞
NIO是非阻塞IO,在傳統TCP點對點直接連接的基礎上做了一層封裝,并不是Client與Server直接建立連接,而是Client先到Server端進行管道注冊。在Server端創建一個Selector多路復用器,啟動一個線程輪詢注冊到Selector上所有Channel的狀態,根據通道的狀態,執行相關操作。通道的狀態包括:Connect連接狀態、Accept阻塞狀態、Read可讀狀態、Write可寫狀態。NIO編程中有3個重要部分:Buffer緩沖區、Channel管道、Selector多路復用器
Buffer緩沖區
在NIO類庫中加入了Buffer對象,它包含一些需要寫入或讀取的數據。在面向流的IO中,可以將數據直接寫入或讀取到Stream對象中。在NIO庫中,所有數據的讀取與寫入都是用緩沖區處理的。緩沖區實際上是一個數組,這個數組為緩沖區提供了數據的訪問讀寫等操作屬性,如位置、容量、上限等。通常為一個字節數組(ByteBuffer),也可以是其它java基本類型(Boolean除外)的數組,如:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer。
NIO編程中,在get()、put(value)、put(array)之后,注意執行Buffer對象的flip()方法,將position復位為0
import java.nio.IntBuffer;
public class TestBuffer {
public static void main(String[] args) {
//創建指定長度的緩沖區
IntBuffer intBuffer = IntBuffer.allocate(10);
intBuffer.put(1);//pos=0值為1
intBuffer.put(2);//pos=1值為2
intBuffer.put(3);//pos=2值為3
intBuffer.put(4);//pos=3值為4
intBuffer.put(5);//pos=4值為5
System.out.println("intBuffer:" + intBuffer);
//位置pos復位為0
intBuffer.flip();
System.out.println("intBuffer執行flip()后:" + intBuffer);
System.out.println("pos:" + intBuffer.position());
System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
//get(index)方法,pos不改變
System.out.println("intBuffer.get(3):" + intBuffer.get(3));
System.out.println("intBuffer執行get(3)后:" + intBuffer);
//put(index, change)方法,pos不改變
intBuffer.put(2, 33);
System.out.println("intBuffer執行put(2, 33)后:" + intBuffer);;
//get()方法,pos改變,pos值加1
for (int i = 0; i < intBuffer.limit(); i++) {
System.out.print(intBuffer.get() + "\t");
}
System.out.println();
System.out.println("intBuffer使用for循環遍歷之后: " + intBuffer);
System.out.println("pos:" + intBuffer.position());
System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
//wrap包裹數組
System.out.println("------wrap包裹數組------");
int[] array = new int[]{6,7,8,9,10};
IntBuffer wrapIntBuffer = IntBuffer.wrap(array);
System.out.println("wrapIntBuffer:"+wrapIntBuffer);
for (int i = 0; i < wrapIntBuffer.limit(); i++) {
System.out.print(wrapIntBuffer.get() + "\t");
}
System.out.println();
System.out.println("wrapIntBuffer使用for循環遍歷之后: " + wrapIntBuffer);
//pos復位為0
wrapIntBuffer.flip();
//修改wrapIntBuffer下標2位置處的8為88
wrapIntBuffer.put(2,88);
System.out.println("pos:" + wrapIntBuffer.position());
System.out.println("lim:" + wrapIntBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + wrapIntBuffer.capacity());//intBuffer容量
System.out.print("wrapIntBuffer使用for循環遍歷:");
for (int i = 0; i < wrapIntBuffer.limit(); i++) {
System.out.print(wrapIntBuffer.get() + "\t");
}
System.out.println();
System.out.print("被wrap包裹的array內容發生了改變:");
for(int j=0;j<array.length;j++){
System.out.print(array[j]+"\t");
}
//復制方法
System.out.println();
System.out.println("------復制方法------");
IntBuffer intBufferOne = IntBuffer.allocate(10);
intBufferOne.put(array);//pos發生變化
System.out.println("intBufferOne:"+intBufferOne);
intBufferOne.flip();//pos復位
System.out.print("intBufferOne使用for循環遍歷:");
for (int i = 0; i < intBufferOne.limit(); i++) {
System.out.print(intBufferOne.get() + "\t");
}
//duplicate()復制
intBufferOne.flip();//pos復位
IntBuffer intBufferTwo = intBufferOne.duplicate();
System.out.println();
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可讀數據為:" + intBufferTwo.remaining());//limit - position
intBufferTwo.position(2);
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可讀數據為:" + intBufferTwo.remaining());//limit - position
}
}
Eclipse的console輸出如下:
Channel通道
網絡數據通過Channel通道讀取和寫入,通道與流不同之處在于通道是雙向的,而流(InputStream或OutputStream的子類)只能在一個方向上移動。通道可以用于讀、寫或者兩者同時進行。Channel通道可以與多路復用器結合起來,有多種狀態位,方便多路復用器識別并執行相應操作。
Channel通道分為兩大類:一類是網絡讀寫的SelectableChannel,一類是用于文件操作的FileChannel。SocketChannel和ServerSocketChannel都是SelectableChannel的子類。
Selector多路復用器
它是NIO編程的基礎,提供選擇已經就緒任務的能力。當IO事件(管道)注冊到選擇器以后,Selector會分配給每個管道一個key值。Selector會不斷輪詢注冊在其上的通道Channel,如果某個通道發生了讀寫操作,這個通道就處于就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以取得就緒的Channel集合,從而進行后續的IO操作,從管道中讀取或者寫入數據,寫到數據緩沖區Buffer中。一個多路復用器Selector可以負責成千上萬的Channel通道,JDK使用epoll代替了傳統的select實現,使得獲取連接句柄沒有限制。只需要一個線程負責Selector輪詢,就可以接入成千上萬的客戶端。
下面代碼,在Server類的構造方法中,創建ServerSocketChannel對象,將該對象注冊到多路復用器Selector上,并處于阻塞accept狀態。由于Server類實現了Runnable接口,在run()方法中存在while(true)循環,在while循環體中不論客戶端Channel還是服務器Channel,都在多路復用器的輪詢的范圍。在輪詢過程中,獲取所有注冊到多路復用器Selector上的key,在這個while(true)首次執行的時候,獲取到的處于阻塞狀態的Channel為服務器Channel,這個服務器端Channel執行accept()方法,監聽處于就緒狀態的客戶端Channel,將客戶端Channel通道注冊到多路復用器Selector上,并監聽其讀標示位。在存在客戶端Channel注冊到Selector的情況下,在while(true)循環體中,若客戶端key處于key.isReadable()為true時,就會執行read()方法。在read方法中,首先將緩沖區清空,獲取調用read()方法的客戶端Channel,讀取客戶端Channel中的數據到緩沖區Buffer。
綜合使用Buffer、Channel、Selector的Client端與Server端雙向通信示例
Server.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server implements Runnable{
//多路復用器(管理所有的通道)
private Selector selector;
//建立讀緩沖區,緩存空間大小1024
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//建立寫緩沖區,緩存空間大小1024
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//打開多路復用器
this.selector = Selector.open();
//打開服務器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//設置服務器通道為非阻塞模式
ssc.configureBlocking(false);
//綁定監聽端口
ssc.bind(new InetSocketAddress(port));
//把服務器通道注冊到多路復用器上,并且監聽阻塞事件
ssc.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//要讓多路復用器開始監聽
this.selector.select();
//返回多路復用器已經選擇的結果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//進行遍歷
while(keys.hasNext()){
//獲取一個選擇的元素
SelectionKey key = keys.next();
//直接從容器中移除就可以了
keys.remove();
//如果是有效的
if(key.isValid()){
//如果為阻塞狀態
if(key.isAcceptable()){
this.accept(key);
}
//如果為可讀狀態
if(key.isReadable()){
this.read(key);
}
//如果為可 寫狀態
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*向SocketChannel中寫數據*/
private void write(SelectionKey key){
SocketChannel sc = (SocketChannel) key.channel();
try {
//定義一個字節數組
byte[] bytes = new byte[1024];
//使用系統錄入功能,等待用戶輸入數據并回車
System.in.read(bytes);
//把數據放到緩沖區中
writeBuf.put(bytes);
//對緩沖區進行復位
writeBuf.flip();
//寫出數據給Client端
sc.write(writeBuf);
//清空緩沖區數據
writeBuf.clear();
//因已經執行了向SocketChannel的寫操作,這里向selector注冊sc通道的讀事件狀態
sc.register(this.selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//清空緩沖區舊的數據
this.readBuf.clear();
//獲取之前注冊的socket通道對象
SocketChannel sc = (SocketChannel) key.channel();
//讀取數據到緩沖區
int count = sc.read(this.readBuf);
//如果沒有數據
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//有數據則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
/*Flips this buffer. The limit is set to the current position and then
the position is set to zero. If the mark is defined then it is discarded.*/
this.readBuf.flip();
//根據緩沖區的數據長度創建相應大小的byte數組,接收緩沖區的數據
byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用數據個數
//接收緩沖區數據到字節數組
this.readBuf.get(bytes);
//打印結果
String body = new String(bytes).trim();
System.out.println("服務器端接收到客戶端發送的信息 : " + body);
//因已經執行了向SocketChannel的讀操作,這里向selector注冊sc通道的寫事件狀態
sc.register(this.selector,SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//服務通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//阻塞方法
SocketChannel sc = ssc.accept();
//阻塞模式
sc.configureBlocking(false);
//將客戶端通道注冊到多路復用器上,并設置讀取標識
sc.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
在Server.java中,因為ServerSocketChannel對象在Selector上僅僅注冊了SelectionKey.OP_ACCEPT事件狀態,因此Server端創建的一個線程,在輪詢Selector過程中,獲取處于就緒狀態的所有Channel通道的集合。Selector分配給ServerSocketChannel對象的唯一key,這個key.isAcceptable()為true則執行accept(key)方法,使這個key對應的服務器端Channel一直處于accept監聽狀態。
Client.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Client implements Runnable{
//多路復用器(管理所有的通道)
private Selector selector;
//建立寫緩沖區
private ByteBuffer bufferWrite = ByteBuffer.allocate(1024);
//建立讀緩沖區
private ByteBuffer bufferRead = ByteBuffer.allocate(1024);
//創建連接的地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
public Client(){
try {
//打開多路復用器
this.selector = Selector.open();
//打開客戶端通道
SocketChannel sc = SocketChannel.open();
//客戶端通道為非阻塞模式
sc.configureBlocking(false);
//多路復用器Selector上,給sc注冊connect事件狀態
sc.register(selector, SelectionKey.OP_CONNECT);
//進行連接
sc.connect(address);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
SocketChannel socketChannel;
while(true){
try {
//要讓多路復用器開始監聽
this.selector.select();
//返回多路復用器已經選擇的結果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//進行遍歷
while(keys.hasNext()){
//獲取一個選擇的元素
SelectionKey key = keys.next();
//直接從容器中移除就可以了
keys.remove();
//如果是有效的
if(key.isValid()){
//如果為連接狀態
if(key.isConnectable()){
System.out.println("client connect");
socketChannel =(SocketChannel)key.channel();
/*Returns:
true if, and only if, a connection operation has been initiated on
this channel but not yet completed by invoking the finishConnect method*/
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
System.out.println("客戶端完成連接操作!");
//把數據放到緩沖區中
bufferWrite.put("Hello,Server".getBytes());
//對緩沖區進行復位
bufferWrite.flip();
//寫出數據給Server端
socketChannel.write(bufferWrite);
//清空寫緩沖區
bufferWrite.clear();
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 如果為可讀狀態
if(key.isReadable()){
this.read(key);
}
// 如果為可寫狀態
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
try {
SocketChannel sc = (SocketChannel) key.channel();
byte[] bytes = new byte[1024];
System.in.read(bytes);
//把數據放到緩沖區中
bufferWrite.put(bytes);
//對緩沖區進行復位
bufferWrite.flip();
//寫出數據給Server端
sc.write(bufferWrite);
//清空緩沖區數據
bufferWrite.clear();
sc.register(this.selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//清空緩沖區舊的數據
this.bufferRead.clear();
//獲取之前注冊的socket通道對象
SocketChannel sc = (SocketChannel) key.channel();
//讀取數據到緩沖區
int count = sc.read(this.bufferRead);
//如果沒有數據
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//有數據則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
this.bufferRead.flip();
//根據緩沖區的數據長度創建相應大小的byte數組,接收緩沖區的數據
byte[] bytes = new byte[this.bufferRead.remaining()];//this.readBuf.remaining()可用數據個數
//接收緩沖區數據到字節數組
this.bufferRead.get(bytes);
// 打印結果
String body = new String(bytes).trim();
System.out.println("客戶端接收到服務器端返回的信息 : " + body);
sc.register(this.selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
Client client=new Client();
new Thread(client).start();//單獨啟動一個線程,去輪詢注冊到多路復用器上的所有通道
} catch (Exception e) {
e.printStackTrace();
}
}
}
run as --java application,首先啟動Server,Eclipse的console輸出如下:
再啟動Client,此時Server對應的console如下:
Client對應的console如下:
Client與Server進行交互通信,互相發送聊天消息后,Eclipse的console輸入如下:
AIO
傳統的BIO建立連接需要三次握手,并且在服務器端創建一個線程去處理客戶端請求。在NIO中,客戶端Channel通道注冊到多路復用器Selector上,減少了三次握手的過程,在服務器端只需要一個線程去輪詢注冊到多路復用器上的Channel的狀態位,根據不同狀態位執行不同的操作。
JDK1.7之后,AIO在之前NIO的基礎上引入異步通道的概念,并提供了異步文件和異步套接字通道的實現,實現了異步非阻塞。AIO不需要通過多路復用器來對注冊的通道進行輪詢操作,即可實現異步讀寫,簡化了NIO編程模型。相對于NIO中使用的SocketChannel、ServerSocketChannel,AIO中使用的是AsynchronousSocketChannel、AsynchronousServerSocketChannel。
Server.java
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.Channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
//線程池
private ExecutorService executorService;
//通道group
private AsynchronousChannelGroup channelGroup;
//異步服務器通道
public AsynchronousServerSocketChannel assc;
public Server(int port){
try {
//創建一個緩存池,注意不要使用FixedThreadPool,否則只能接受指定數量的并發客戶端請求
executorService = Executors.newCachedThreadPool();
//創建異步channel group,1代表初始化線程的數量
/*Creates an asynchronous channel group with a given thread
pool that creates new threads as needed.*/
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//創建異步服務器通道
/*Opens an asynchronous server-socket channel.*/
assc = AsynchronousServerSocketChannel.open(channelGroup);
//進行綁定監聽端口
assc.bind(new InetSocketAddress(port));
System.out.println("server start , port : " + port);
//此處不是阻塞,而是繼續向下執行。進行通信的相關處理操作在ServerCompletionHandler
assc.accept(this, new ServerCompletionHandler());//this指的是Server類的對象
//一直休眠 不讓服務器線程停止
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Server server = new Server(8765);
}
}
在Server端創建一個緩存線程池,服務器端使用的是AsynchronousServerSocketChannel,使用bind()方法綁定監聽端口,使用如上述代碼Server.java中accept(this, new ServerCompletionHandler())接收和處理客戶端請求,但是這個accept是一個異步操作,交給線程池去異步的處理當前這個客戶端操作,而Server.java對應的主線程繼續向下執行,所以在代碼中使用了Thread.sleep(Integer.MAX_VALUE);保持Server對應的線程非關閉。
ServerCompletionHandler.java
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
//當有一個客戶端接入的時候 直接調用Server的accept方法
attachment.assc.accept(attachment, this);//this指的是ServerCompletionHandler對象
read(asc);
}
//AsynchronousSocketChannel為客戶端通道
private void read(final AsynchronousSocketChannel asc) {
//讀取數據
ByteBuffer buf = ByteBuffer.allocate(1024);
//異步方法,不會阻塞在這,主程序繼續執行下面操作
/*This method initiates an asynchronous read operation to read a sequence of bytes
from this channel into the given buffer. */
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進行讀取之后,重置標識位
attachment.flip();
//獲得讀取的字節數
System.out.println("Server接收內容字節數:" + resultSize);
//獲取讀取的數據
String resultData = new String(attachment.array()).trim();
System.out.println("Server接收到的內容:" + resultData);
String response = "收到數據" + resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
/*This method initiates an asynchronous write operation to write a sequence of bytes
to this channel from the given buffer. */
//使用到多線程設計模式中的Future,先返回一個Future代理對象。后臺新啟動一個線程A,進行數據的寫操作。調用get()方法時才真正獲取線程A執行任務的結果
asc.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}
Client.java
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
public class Client implements Runnable{
private AsynchronousSocketChannel asc ;
public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1", 8765));
}
public void write(String content){
try {
asc.write(ByteBuffer.wrap(content.getBytes())).get();//調用get()方法異步寫
read();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println("客戶端接收到的反饋信息:"+new String(respByte,"utf-8").trim());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1, "c1").start();
new Thread(c2, "c2").start();
new Thread(c3, "c3").start();
Thread.sleep(1000);
c1.write("this is c1");
c2.write("this is c2");
c3.write("this is c3");
}
}
啟動Server,在Eclipse的console輸出如下:
啟動Client,此時服務器端和客戶端對應的console輸出如下:
阻塞與非阻塞、同步與異步
阻塞:應用程序在獲取網絡數據的時候,如果網絡傳輸數據很慢,那么程序就一直等待,直到數據傳輸完畢為止。
非阻塞:應用程序直接可以獲取已經準備就緒好的數據,無須等待。
IO(BIO)為同步阻塞形式,NIO為同步非阻塞形式。在JDK1.7之后,升級了NIO庫包,支持異步非阻塞通信模型NIO2.0(AIO)
同步與異步一般是面向操作系統與應用程序對IO操作的層面上來區別的
同步:應用程序會直接參與IO讀寫操作,并直接阻塞到某一個方法上,直到數據準備就緒;或者采用輪詢的策略實時檢查數據的就緒狀態,如果就緒則獲取數據。
異步:所有的IO讀寫操作交給操作系統處理,與應用程序沒有直接關系。當操作系統完成了IO讀寫操作時,會給應用程序發通知,應用程序直接拿走數據即可。
BIO、NIO、AIO三者區別
BIO:它屬于傳統的Socket編程,客戶端與服務器端連接的建立需要經過TCP3次握手的過程。服務器端ServerSocket首先啟動,指定端口并執行accept()進行阻塞,監聽客戶端的連接請求。若接收到客戶端的連接請求并成功建立連接后,客戶端與服務器端通過Socket套接字中的數據流進行相互之間的數據通信。針對每一個成功建立連接的客戶端,服務器端都會創建一個線程去處理這個客戶端的請求,若建立連接的客戶端規模很大的時候,對服務器端資源是一種嚴重浪費。
NIO:在NIO中引入了Channel通道、Buffer緩沖區、Selector多路復用器的概念,客戶端SocketChannel與服務器端ServerSocketChannel都需要在Selector多路復用器上進行注冊。在服務器端會創建一個線程對注冊到Selector多路復用器上的所有Channel進行輪詢,輪詢出處于就緒狀態的Channel集合,根據為每個Channel分配的唯一key,獲取具體的某個Channel,并根據其狀態標志位,進行處理,從Channel中讀取或者寫入數據,寫到Buffer數據緩沖區中。每個管道都會對Selector進行注冊不同的事件狀態,方便Selector查找,事件狀態包括:SelectionKey.OP_CONNECT連接狀態、SelectionKey.OP_ACCEPT阻塞狀態、SelectionKey.OP_READ可讀狀態、SelectionKey.OP_WRITE可寫狀態。
AIO:使用線程池中的線程來處理客戶端的請求,針對每一個客戶端的請求,會創建一個處理該任務的對象,如上面ServerCompletionHandler類的對象,來完成讀、寫任務。AIO真正實現了異步非阻塞。
智能推薦
結合代碼詳細聊聊 Java 網絡編程中的 BIO、NIO 和 AIO
本文從操作系統的角度來解釋BIO,NIO,AIO的概念,含義和背后的那些事。本文主要分為3篇。 第一篇 講解BIO和NIO以及IO多路復用 第二篇 講解磁盤IO和AIO 第三篇 講解在這些機制上的一些應用的實現方式,比如nginx,nodejs,Java NIO等 到底什么是“IO Block” 很多人說BIO不好,會“block&rdquo...
【Java網絡編程】基于BIO/NIO/AIO的多人聊天室(五):AIO聊天室
課程《一站式學習Java網絡編程 全面理解BIO/NIO/AIO》的學習筆記(五): 異步調用機制 & AIO編程模型 & 基于AIO的多人聊天室實現 源碼地址:https://github.com/NoxWang/web-program 【Java網絡編程】基于BIO/NIO/AIO的多人聊天室(一):java IO與內核IO 【Java網絡編程】基于BIO/NIO/AIO的多人...
【Java】---BIO、NIO、AIO
1.同步與異步 同步:是指發出一個請求,在沒有得到結果之前該請求就不會 返回結果,直到請求返回時; 比如燒水,一直在看,直到水燒好之后,才去干別的事。 異步:發出一個請求后,立刻得到了回應,但沒有返回結果,在此階段可以去處理別的事情 比如燒水時,你可以設置一個定時,期間你可以去做別的事情 2.阻塞與非阻塞 阻塞: 是指請求結果返回之前當前的線程會被掛起,此時的線程什么也不會去做 非阻塞:是指請求結...
BIO、NIO、AIO
一:BIO 1、網絡編程的基本模型是C/S模型,即兩個進程間的通信。 2、服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。 3、傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功后,雙方通過輸入和輸出流進行同步阻塞式通信。 ...
BIO,NIO,AIO總結
BIO,NIO,AIO總結 通過對Linux系統的網絡IO模型映射到java的IO實現.從而解釋為什么BIO是同步阻塞,NIO是同步非阻塞,AIO是異步. Linux系統I/O基礎 Linux系統用戶態與內核態 系統內存分為: 用戶態內存和內核態內存 如上圖所示,從宏觀上來看,Linux操作系統的體系架構分為用戶態和內核態(或者用戶空間和內核)。內核從本質上看是一種軟件——...
猜你喜歡
BIO、NIO、AIO 講解
一、前沿 在通信框架中經常使用到的三種通信模式,即 BIO、NIO 和 AIO,它們也是面試中經常被問到的,如果學會了它們將會給你帶來薪資的變化哦。下面分別對三者介紹一下,通過示例理解其用法 下面先通過一張圖來簡單了解一下三者,如下所示: 同步阻塞IO : 用戶進程發起一個IO操作以后,必須等待IO操作的真正完成后,才能繼續運行 同步非阻塞IO: 用戶進程發起一個IO操作以后,可做其它事情,但用戶...
BIO, NIO, AIO
同步,異步,阻塞,非阻塞 同步:a 事件必須等到 b 事件完成才可以繼續執行/返回 異步:a 事件可以先執行/返回,不需要等待 b 事件的完成,而是通過回調處理 b 事件的返回結果 阻塞:當發起一次請求時,調用者一直等待結果的返回,只有當條件滿足時,才繼續處理后續的工作 非阻塞:當發起一次請求時,調用者不用一直等待結果的返回,可以先去做其他的事情 1、BIO(Blocking IO) 同步阻塞 i...
freemarker + ItextRender 根據模板生成PDF文件
1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...