• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Netty網絡編程框架

    標簽: Java  Netty

    一:簡介

           Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。也就是說,Netty 是一個基于NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。

           “快速”和“簡單”并不用產生維護性或性能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各種二進制,文本協議,并經過相當精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易于開發的同時還保證了其應用的性能,穩定性和伸縮性。

           Netty從4.x版本開始,需要使用JDK1.6及以上版本提供基礎支撐。

           在設計上:針對多種傳輸類型的統一接口 - 阻塞和非阻塞;簡單但更強大的線程模型;真正的無連接的數據報套接字支持;鏈接邏輯支持復用。

           在性能上:比核心 Java API 更好的吞吐量,較低的延時;資源消耗更少,這個得益于共享池和重用;減少內存拷貝。

           在健壯性上:消除由于慢,快,或重載連接產生的 OutOfMemoryError;消除經常發現在 NIO 在高速網絡中的應用中的不公平的讀/寫比。

           在安全上:完整的 SSL / TLS 和 StartTLS 的支持,且已得到大量商業應用的真實驗證,如:Hadoop項目的Avro(RPC框架)、Dubbo、Dubbox等RPC框架。

           Netty的官網是:

    http://netty.io

           有三方提供的中文翻譯Netty用戶手冊(官網提供源信息):

    http://ifeve.com/netty5-user-guide/

    二:Netty架構

    三:線程模型

    1:單線程模型

            在ServerBootstrap調用方法group的時候,傳遞的參數是同一個線程組,且在構造線程組的時候,構造參數為1,這種開發方式,就是一個單線程模型。個人機開發測試使用。不推薦。

    2:多線程模型

            在ServerBootstrap調用方法group的時候,傳遞的參數是兩個不同的線程組。負責監聽的acceptor線程組,線程數為1,也就是構造參數為1。負責處理客戶端任務的線程組,線程數大于1,也就是構造參數大于1。這種開發方式,就是多線程模型。長連接,且客戶端數量較少,連接持續時間較長情況下使用。如:企業內部交流應用。

    3:主從多線程模型

           在ServerBootstrap調用方法group的時候,傳遞的參數是兩個不同的線程組。負責監聽的acceptor線程組,線程數大于1,也就是構造參數大于1。負責處理客戶端任務的線程組,線程數大于1,也就是構造參數大于1。這種開發方式,就是主從多線程模型。長連接,客戶端數量相對較多,連接持續時間比較長的情況下使用。如:對外提供服務的相冊服務器。

     

    四:基礎程序

    1:入門

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.first;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server4HelloWorld {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4HelloWorld(){
    		init();
    	}
    	private void init(){
    		// 初始化線程組,構建線程組的時候,如果不傳遞參數,則默認構建的線程組線程數是CPU核心數量。
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		// 初始化服務的配置
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO, 同步非阻塞
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小, 緩存區的單位是字節。
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	/**
    	 * 監聽處理邏輯。
    	 * @param port 監聽端口。
    	 * @param acceptorHandlers 處理器, 如何處理客戶端請求。
    	 * @return
    	 * @throws InterruptedException
    	 */
    	public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
    		
    		/*
    		 * childHandler是服務的Bootstrap獨有的方法。是用于提供處理對象的。
    		 * 可以一次性增加若干個處理邏輯。是類似責任鏈模式的處理方式。
    		 * 增加A,B兩個處理邏輯,在處理客戶端請求數據的時候,根據A-》B順序依次處理。
    		 * 
    		 * ChannelInitializer - 用于提供處理器的一個模型對象。
    		 *  其中定義了一個方法,initChannel方法。
    		 *   方法是用于初始化處理邏輯責任鏈條的。
    		 *   可以保證服務端的Bootstrap只初始化一次處理器,盡量提供處理邏輯的重用。
    		 *   避免反復的創建處理器對象。節約資源開銷。
    		 */
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		// bind方法 - 綁定監聽端口的。ServerBootstrap可以綁定多個監聽端口。 多次調用bind方法即可
    		// sync - 開始監聽邏輯。 返回一個ChannelFuture。 返回結果代表的是監聽成功后的一個對應的未來結果
    		// 可以使用ChannelFuture實現后續的服務器和客戶端的交互。
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	
    	/**
    	 * shutdownGracefully - 方法是一個安全關閉的方法。可以保證不放棄任何一個已接收的客戶端請求。
    	 */
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4HelloWorld server = null;
    		try{
    			server = new Server4HelloWorld();
    			future = server.doAccept(9999,new Server4HelloWorldHandler());
    			System.out.println("server started.");
    			
    			// 關閉連接的。
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    /**
     * @Sharable注解 - 
     *  代表當前Handler是一個可以分享的處理器。也就意味著,服務器注冊此Handler后,可以分享給多個客戶端同時使用。
     *  如果不使用注解描述類型,則每次客戶端請求時,必須為客戶端重新創建一個新的Handler對象。
     *  如果handler是一個Sharable的,一定避免定義可寫的實例變量。
     *  bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(new XxxHandler());
    			}
    		});
     */
    package com.hhxy.netty.first;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    
    @Sharable
    public class Server4HelloWorldHandler extends ChannelHandlerAdapter {
    	
    	/**
    	 * 業務處理邏輯
    	 * 用于處理讀取數據請求的邏輯。
    	 * ctx - 上下文對象。其中包含于客戶端建立連接的所有資源。 如: 對應的Channel
    	 * msg - 讀取到的數據。 默認類型是ByteBuf,是Netty自定義的。是對ByteBuffer的封裝。 不需要考慮復位問題。
    	 */
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		// 獲取讀取的數據, 是一個緩沖。
    		ByteBuf readBuffer = (ByteBuf) msg;
    		// 創建一個字節數組,用于保存緩存中的數據。
    		byte[] tempDatas = new byte[readBuffer.readableBytes()];
    		// 將緩存中的數據讀取到字節數組中。
    		readBuffer.readBytes(tempDatas);
    		String message = new String(tempDatas, "UTF-8");
    		System.out.println("from client : " + message);
    		if("exit".equals(message)){
    			ctx.close();
    			return;
    		}
    		String line = "server message to client!";
    		// 寫操作自動釋放緩存,避免內存溢出問題。
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		// 注意,如果調用的是write方法。不會刷新緩存,緩存中的數據不會發送到客戶端,必須再次調用flush方法才行。
    		// ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		// ctx.flush();
    	}
    
    	/**
    	 * 異常處理邏輯, 當客戶端異常退出的時候,也會運行。
    	 * ChannelHandlerContext關閉,也代表當前與客戶端連接的資源關閉。
    	 */
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.first;
    
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * 因為客戶端是請求的發起者,不需要監聽。
     * 只需要定義唯一的一個線程組即可。
     */
    public class Client4HelloWorld {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 客戶端啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4HelloWorld(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
    		/*
    		 * 客戶端的Bootstrap沒有childHandler方法。只有handler方法。
    		 * 方法含義等同ServerBootstrap中的childHandler
    		 * 在客戶端必須綁定處理器,也就是必須調用handler方法。
    		 * 服務器必須綁定處理器,必須調用childHandler方法。
    		 */
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		// 建立連接。
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4HelloWorld client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4HelloWorld();
    			future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
    			
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server (enter 'exit' for close client) > ");
    				String line = s.nextLine();
    				if("exit".equals(line)){
    					// addListener - 增加監聽,當某條件滿足的時候,觸發監聽器。
    					// ChannelFutureListener.CLOSE - 關閉監聽器,代表ChannelFuture執行返回后,關閉連接。
    					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
    						.addListener(ChannelFutureListener.CLOSE);
    					break;
    				}
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.first;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4HelloWorldHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			ByteBuf readBuffer = (ByteBuf) msg;
    			byte[] tempDatas = new byte[readBuffer.readableBytes()];
    			readBuffer.readBytes(tempDatas);
    			System.out.println("from server : " + new String(tempDatas, "UTF-8"));
    		}finally{
    			// 用于釋放緩存。避免內存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    	/*@Override // 斷開連接時執行
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelInactive method run...");
    	}
    
    	@Override // 連接通道建立成功時執行
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelActive method run...");
    	}
    
    	@Override // 每次讀取完成時執行
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelReadComplete method run...");
    	}*/
    
    }
    

    2:拆包粘包問題解決

           netty使用tcp/ip協議傳輸數據。而tcp/ip協議是類似水流一樣的數據傳輸方式。多次訪問的時候有可能出現數據粘包的問題,解決這種問題的方式如下:

    2.1:定長數據流

           客戶端和服務器,提前協調好,每個消息長度固定。(如:長度10)。如果客戶端或服務器寫出的數據不足10,則使用空白字符補足(如:使用空格)。

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.fixedlength;
    
    import java.nio.charset.Charset;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4FixedLength {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4FixedLength(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
    				// 定長Handler。通過構造參數設置消息長度(單位是字節)。發送的消息長度不足可以使用空格補全。
    				acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
    				// 字符串解碼器Handler,會自動處理channelRead方法的msg參數,將ByteBuf類型的數據轉換為字符串對象
    				acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				acceptorHandlers[2] = new Server4FixedLengthHandler();
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4FixedLength server = null;
    		try{
    			server = new Server4FixedLength();
    			
    			future = server.doAccept(9999);
    			System.out.println("server started.");
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.fixedlength;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		String message = msg.toString();
    		System.out.println("from client : " + message.trim());
    		String line = "ok ";
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    	}
    	
    
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.fixedlength;
    
    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4FixedLength {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4FixedLength(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ChannelHandler[] handlers = new ChannelHandler[3];
    				handlers[0] = new FixedLengthFrameDecoder(3);
    				// 字符串解碼器Handler,會自動處理channelRead方法的msg參數,將ByteBuf類型的數據轉換為字符串對象
    				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				handlers[2] = new Client4FixedLengthHandler();
    				
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4FixedLength client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4FixedLength();
    			
    			future = client.doRequest("localhost", 9999);
    			
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server > ");
    				String line = s.nextLine();
    				byte[] bs = new byte[5];
    				byte[] temp = line.getBytes("UTF-8");
    				if(temp.length <= 5){
    					for(int i = 0; i < temp.length; i++){
    						bs[i] = temp[i];
    					}
    				}
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.fixedlength;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4FixedLengthHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于釋放緩存。避免內存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

    2.2:特殊結束符

           客戶端和服務器,協商定義一個特殊的分隔符號,分隔符號長度自定義。如:‘#’、‘$_$’、‘AA@’。在通訊的時候,只要沒有發送分隔符號,則代表一條數據沒有結束。

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.delimiter;
    
    import java.nio.charset.Charset;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4Delimiter {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4Delimiter(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				// 數據分隔符, 定義的數據分隔符一定是一個ByteBuf類型的數據對象。
    				ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
    				ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
    				// 處理固定結束標記符號的Handler。這個Handler沒有@Sharable注解修飾,
    				// 必須每次初始化通道時創建一個新對象
    				// 使用特殊符號分隔處理數據粘包問題,也要定義每個數據包最大長度。netty建議數據有最大長度。
    				acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
    				// 字符串解碼器Handler,會自動處理channelRead方法的msg參數,將ByteBuf類型的數據轉換為字符串對象
    				acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				acceptorHandlers[2] = new Server4DelimiterHandler();
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4Delimiter server = null;
    		try{
    			server = new Server4Delimiter();
    			
    			future = server.doAccept(9999);
    			System.out.println("server started.");
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.delimiter;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Server4DelimiterHandler extends ChannelHandlerAdapter {
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		String message = msg.toString();
    		System.out.println("from client : " + message);
    		String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    	}
    	
    
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.delimiter;
    
    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4Delimiter {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4Delimiter(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				// 數據分隔符
    				ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
    				ChannelHandler[] handlers = new ChannelHandler[3];
    				handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
    				// 字符串解碼器Handler,會自動處理channelRead方法的msg參數,將ByteBuf類型的數據轉換為字符串對象
    				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				handlers[2] = new Client4DelimiterHandler();
    				
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Delimiter client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Delimiter();
    			
    			future = client.doRequest("localhost", 9999);
    			
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server > ");
    				String line = s.nextLine();
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.delimiter;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4DelimiterHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于釋放緩存。避免內存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

    2.3:協議

           相對最成熟的數據傳遞方式。有服務器的開發者提供一個固定格式的協議標準。客戶端和服務器發送數據和接受數據的時候,都依據協議制定和解析消息。

    協議格式:
    HEADcontent-length:xxxxHEADBODYxxxxxxBODY
    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.protocol;
    
    import java.nio.charset.Charset;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4Protocol {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4Protocol(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4Protocol server = null;
    		try{
    			server = new Server4Protocol();
    			future = server.doAccept(9999,new Server4ProtocolHandler());
    			System.out.println("server started.");
    			
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    /**
     * @Sharable注解 - 
     *  代表當前Handler是一個可以分享的處理器。也就意味著,服務器注冊此Handler后,可以分享給多個客戶端同時使用。
     *  如果不使用注解描述類型,則每次客戶端請求時,必須為客戶端重新創建一個新的Handler對象。
     *  
     */
    package com.hhxy.netty.protocol;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @Sharable
    public class Server4ProtocolHandler extends ChannelHandlerAdapter {
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		String message = msg.toString();
    		System.out.println("server receive protocol content : " + message);
    		message = ProtocolParser.parse(message);
    		if(null == message){
    			System.out.println("error request from client");
    			return ;
    		}
    		System.out.println("from client : " + message);
    		String line = "server message";
    		line = ProtocolParser.transferTo(line);
    		System.out.println("server send protocol content : " + line);
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    	}
    
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    	
    	static class ProtocolParser{
    		public static String parse(String message){
    			String[] temp = message.split("HEADBODY");
    			temp[0] = temp[0].substring(4);
    			temp[1] = temp[1].substring(0, (temp[1].length()-4));
    			int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
    			if(length != temp[1].length()){
    				return null;
    			}
    			return temp[1];
    		}
    		public static String transferTo(String message){
    			message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
    			return message;
    		}
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.protocol;
    
    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4Protocol {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4Protocol(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Protocol client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Protocol();
    			future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
    			
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server > ");
    				String line = s.nextLine();
    				line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
    				System.out.println("client send protocol content : " + line);
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.protocol;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4ProtocolHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("client receive protocol content : " + message);
    			message = ProtocolParser.parse(message);
    			if(null == message){
    				System.out.println("error response from server");
    				return ;
    			}
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于釋放緩存。避免內存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    	static class ProtocolParser{
    		public static String parse(String message){
    			String[] temp = message.split("HEADBODY");
    			temp[0] = temp[0].substring(4);
    			temp[1] = temp[1].substring(0, (temp[1].length()-4));
    			int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
    			if(length != temp[1].length()){
    				return null;
    			}
    			return temp[1];
    		}
    		public static String transferTo(String message){
    			message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
    			return message;
    		}
    	}
    
    }
    

    3:序列化對象

    JBoss Marshalling序列化:Java是面向對象的開發語言。傳遞的數據如果是Java對象,應該是最方便且可靠。

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.serialized;
    
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server4Serializable {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4Serializable(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4Serializable server = null;
    		try{
    			server = new Server4Serializable();
    			future = server.doAccept(9999,new Server4SerializableHandler());
    			System.out.println("server started.");
    			
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    /**
     * @Sharable注解 - 
     *  代表當前Handler是一個可以分享的處理器。也就意味著,服務器注冊此Handler后,可以分享給多個客戶端同時使用。
     *  如果不使用注解描述類型,則每次客戶端請求時,必須為客戶端重新創建一個新的Handler對象。
     *  
     */
    package com.hhxy.netty.serialized;
    
    import io.netty.channel.ChannelHandler.Sharable;
    
    import com.hhxy.utils.GzipUtils;
    import com.hhxy.utils.RequestMessage;
    import com.hhxy.utils.ResponseMessage;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @Sharable
    public class Server4SerializableHandler extends ChannelHandlerAdapter {
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from client : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    		if(msg instanceof RequestMessage){
    			RequestMessage request = (RequestMessage)msg;
    			byte[] attachment = GzipUtils.unzip(request.getAttachment());
    			System.out.println(new String(attachment));
    		}
    		ResponseMessage response = new ResponseMessage(0L, "test response");
    		ctx.writeAndFlush(response);
    	}
    
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.serialized;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    import com.hhxy.utils.GzipUtils;
    import com.hhxy.utils.RequestMessage;
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client4Serializable {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4Serializable(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Serializable client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Serializable();
    			future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
    			String attachment = "test attachment";
    			byte[] attBuf = attachment.getBytes();
    			attBuf = GzipUtils.zip(attBuf);
    			RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    					"test", attBuf);
    			future.channel().writeAndFlush(msg);
    			TimeUnit.SECONDS.sleep(1);
    			future.addListener(ChannelFutureListener.CLOSE);
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.serialized;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Client4SerializableHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from server : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

    4:定時斷線重連

           客戶端斷線重連機制。

           客戶端數量多,且需要傳遞的數據量級較大。可以周期性的發送數據的時候,使用。要求對數據的即時性不高的時候,才可使用。

           優點: 可以使用數據緩存。不是每條數據進行一次數據交互。可以定時回收資源,對資源利用率高。相對來說,即時性可以通過其他方式保證。如: 120秒自動斷線。數據變化1000次請求服務器一次。300秒中自動發送不足1000次的變化數據。

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.timer;
    
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Server4Timer {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4Timer(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    		// 增加日志Handler,日志級別為info
    		// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    	}
    	public ChannelFuture doAccept(int port) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				// 定義一個定時斷線處理器,當多長時間內,沒有任何的可讀取數據,自動斷開連接。
    				// 構造參數,就是間隔時長。 默認的單位是秒。
    				// 自定義間隔時長單位。 new ReadTimeoutHandler(long times, TimeUnit unit);
    				ch.pipeline().addLast(new ReadTimeoutHandler(3));
    				ch.pipeline().addLast(new Server4TimerHandler());
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4Timer server = null;
    		try{
    			server = new Server4Timer();
    			future = server.doAccept(9999);
    			System.out.println("server started.");
    			
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    /**
     * @Sharable注解 - 
     *  代表當前Handler是一個可以分享的處理器。也就意味著,服務器注冊此Handler后,可以分享給多個客戶端同時使用。
     *  如果不使用注解描述類型,則每次客戶端請求時,必須為客戶端重新創建一個新的Handler對象。
     *  
     */
    package com.hhxy.netty.timer;
    
    import io.netty.channel.ChannelHandler.Sharable;
    
    import com.hhxy.utils.GzipUtils;
    import com.hhxy.utils.RequestMessage;
    import com.hhxy.utils.ResponseMessage;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @Sharable
    public class Server4TimerHandler extends ChannelHandlerAdapter {
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from client : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    		ResponseMessage response = new ResponseMessage(0L, "test response");
    		ctx.writeAndFlush(response);
    	}
    
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.timer;
    
    import java.hhxy.Random;
    import java.hhxy.concurrent.TimeUnit;
    
    import com.hhxy.utils.RequestMessage;
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.WriteTimeoutHandler;
    
    public class Client4Timer {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	private ChannelFuture future = null;
    	
    	public Client4Timer(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    		// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    	}
    	
    	public void setHandlers() throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				// 寫操作自定斷線。 在指定時間內,沒有寫操作,自動斷線。
    				ch.pipeline().addLast(new WriteTimeoutHandler(3));
    				ch.pipeline().addLast(new Client4TimerHandler());
    			}
    		});
    	}
    	
    	public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
    		if(future == null){
    			future = this.bootstrap.connect(host, port).sync();
    		}
    		if(!future.channel().isActive()){
    			future = this.bootstrap.connect(host, port).sync();
    		}
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Timer client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Timer();
    			client.setHandlers();
    			
    			future = client.getChannelFuture("localhost", 9999);
    			for(int i = 0; i < 3; i++){
    				RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    						"test"+i, new byte[0]);
    				future.channel().writeAndFlush(msg);
    				TimeUnit.SECONDS.sleep(2);
    			}
    			TimeUnit.SECONDS.sleep(5);
    			
    			future = client.getChannelFuture("localhost", 9999);
    			RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    					"test", new byte[0]);
    			future.channel().writeAndFlush(msg);
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.timer;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Client4TimerHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from server : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    	/**
    	 * 當連接建立成功后,出發的代碼邏輯。
    	 * 在一次連接中只運行唯一一次。
    	 * 通常用于實現連接確認和資源初始化的。
    	 */
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("client channel active");
    	}
    
    }
    

    5:心跳監測

    使用定時發送消息的方式,實現硬件檢測,達到心態檢測的目的。

    心跳監測是用于檢測電腦硬件和軟件信息的一種技術。如:CPU使用率,磁盤使用率,內存使用率,進程情況,線程情況等。

    5.1:sigar

            需要下載一個zip壓縮包。內部包含若干sigar需要的操作系統文件。sigar插件是通過JVM訪問操作系統,讀取計算機硬件的一個插件庫。讀取計算機硬件過程中,必須由操作系統提供硬件信息。硬件信息是通過操作系統提供的。zip壓縮包中是sigar編寫的操作系統文件,如:windows中的動態鏈接庫文件。

           解壓需要的操作系統文件,將操作系統文件賦值到${Java_home}/bin目錄中。

    /**
     * 1. 雙線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. 綁定服務監聽端口并啟動服務
     */
    package com.hhxy.netty.heatbeat;
    
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server4Heatbeat {
    	// 監聽線程組,監聽客戶端請求
    	private EventLoopGroup acceptorGroup = null;
    	// 處理客戶端相關操作線程組,負責處理與客戶端的數據通訊
    	private EventLoopGroup clientGroup = null;
    	// 服務啟動相關配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4Heatbeat(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 綁定線程組
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 設定緩沖區大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				ch.pipeline().addLast(new Server4HeatbeatHandler());
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4Heatbeat server = null;
    		try{
    			server = new Server4Heatbeat();
    			future = server.doAccept(9999);
    			System.out.println("server started.");
    			
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    
    /**
     * @Sharable注解 - 
     *  代表當前Handler是一個可以分享的處理器。也就意味著,服務器注冊此Handler后,可以分享給多個客戶端同時使用。
     *  如果不使用注解描述類型,則每次客戶端請求時,必須為客戶端重新創建一個新的Handler對象。
     *  
     */
    package com.hhxy.netty.heatbeat;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import com.hhxy.utils.HeatbeatMessage;
    
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @Sharable
    public class Server4HeatbeatHandler extends ChannelHandlerAdapter {
    	
    	private static List<String> credentials = new ArrayList<>();
    	private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";
    	public Server4HeatbeatHandler(){
    		// 初始化客戶端列表信息。一般通過配置文件讀取或數據庫讀取。
    		credentials.add("192.168.199.222_WIN-QIUB2JF5TDP");
    	}
    	
    	// 業務處理邏輯
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		if(msg instanceof String){
    			this.checkCredential(ctx, msg.toString());
    		} else if (msg instanceof HeatbeatMessage){
    			this.readHeatbeatMessage(ctx, msg);
    		} else {
    			ctx.writeAndFlush("wrong message").addListener(ChannelFutureListener.CLOSE);
    		}
    	}
    	
    	private void readHeatbeatMessage(ChannelHandlerContext ctx, Object msg){
    		HeatbeatMessage message = (HeatbeatMessage) msg;
    		System.out.println(message);
    		System.out.println("=======================================");
    		ctx.writeAndFlush("receive heatbeat message");
    	}
    
    	/**
    	 * 身份檢查。檢查客戶端身份是否有效。
    	 * 客戶端身份信息應該是通過數據庫或數據文件定制的。
    	 * 身份通過 - 返回確認消息。
    	 * 身份無效 - 斷開連接
    	 * @param ctx
    	 * @param credential
    	 */
    	private void checkCredential(ChannelHandlerContext ctx, String credential){
    		System.out.println(credential);
    		System.out.println(credentials);
    		if(credentials.contains(credential)){
    			ctx.writeAndFlush(HEATBEAT_SUCCESS);
    		}else{
    			ctx.writeAndFlush("no credential contains").addListener(ChannelFutureListener.CLOSE);
    		}
    	}
    	
    	// 異常處理邏輯
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    
    /**
     * 1. 單線程組
     * 2. Bootstrap配置啟動信息
     * 3. 注冊業務處理Handler
     * 4. connect連接服務,并發起請求
     */
    package com.hhxy.netty.heatbeat;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    import com.hhxy.utils.GzipUtils;
    import com.hhxy.utils.RequestMessage;
    import com.hhxy.utils.SerializableFactory4Marshalling;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client4Heatbeat {
    	
    	// 處理請求和處理服務端響應的線程組
    	private EventLoopGroup group = null;
    	// 服務啟動相關配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4Heatbeat(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 綁定線程組
    		bootstrap.group(group);
    		// 設定通訊模式為NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				ch.pipeline().addLast(new Client4HeatbeatHandler());
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Heatbeat client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Heatbeat();
    			future = client.doRequest("localhost", 9999);
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    
    package com.hhxy.netty.heatbeat;
    
    import java.net.InetAddress;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    import org.hyperic.sigar.CpuPerc;
    import org.hyperic.sigar.FileSystem;
    import org.hyperic.sigar.Mem;
    import org.hyperic.sigar.Sigar;
    
    import com.hhxy.utils.HeatbeatMessage;
    
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4HeatbeatHandler extends ChannelHandlerAdapter {
    
    	private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    	private ScheduledFuture heatbeat;
    	private InetAddress remoteAddr;
    	private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";
    	
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		// 獲取本地INET信息
    		this.remoteAddr = InetAddress.getLocalHost();
    		// 獲取本地計算機名
    		String computerName = System.getenv().get("COMPUTERNAME");
    		String credentials = this.remoteAddr.getHostAddress() + "_" + computerName;
    		System.out.println(credentials);
    		// 發送到服務器,作為信息比對證書
    		ctx.writeAndFlush(credentials);
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			if(msg instanceof String){
    				if(HEATBEAT_SUCCESS.equals(msg)){
    					this.heatbeat = this.executorService.scheduleWithFixedDelay(new HeatbeatTask(ctx), 0L, 2L, TimeUnit.SECONDS);
    					System.out.println("client receive - " + msg);
    				}else{
    					System.out.println("client receive - " + msg);
    				}
    			}
    		}finally{
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		// 回收資源
    		if(this.heatbeat != null){
    			this.heatbeat.cancel(true);
    			this.heatbeat = null;
    		}
    		ctx.close();
    	}
    	
    	class HeatbeatTask implements Runnable{
    		private ChannelHandlerContext ctx;
    		public HeatbeatTask(){
    			
    		}
    		public HeatbeatTask(ChannelHandlerContext ctx){
    			this.ctx = ctx;
    		}
    		public void run(){
    			try {
    				HeatbeatMessage msg = new HeatbeatMessage();
    				msg.setIp(remoteAddr.getHostAddress());
    				Sigar sigar = new Sigar();
    				// CPU信息
    				CpuPerc cpuPerc = sigar.getCpuPerc();
    				Map<String, Object> cpuMsgMap = new HashMap<>();
    				cpuMsgMap.put("Combined", cpuPerc.getCombined());
    				cpuMsgMap.put("User", cpuPerc.getUser());
    				cpuMsgMap.put("Sys", cpuPerc.getSys());
    				cpuMsgMap.put("Wait", cpuPerc.getWait());
    				cpuMsgMap.put("Idle", cpuPerc.getIdle());
    				
    				// 內存信息
    				Map<String, Object> memMsgMap = new HashMap<>();
    				Mem mem = sigar.getMem();
    				memMsgMap.put("Total", mem.getTotal());
    				memMsgMap.put("Used", mem.getUsed());
    				memMsgMap.put("Free", mem.getFree());
    				
    				// 文件系統
    				Map<String, Object> fileSysMsgMap = new HashMap<>();
    				FileSystem[] list = sigar.getFileSystemList();
    				fileSysMsgMap.put("FileSysCount", list.length);
    				List<String> msgList = null;
    				for(FileSystem fs : list){
    					msgList = new ArrayList<>();
    					msgList.add(fs.getDevName() + "總大小:    " + sigar.getFileSystemUsage(fs.getDirName()).getTotal() + "KB");
    					msgList.add(fs.getDevName() + "剩余大小:    " + sigar.getFileSystemUsage(fs.getDirName()).getFree() + "KB");
    					fileSysMsgMap.put(fs.getDevName(), msgList);
    				}
    				
    				msg.setCpuMsgMap(cpuMsgMap);
    				msg.setMemMsgMap(memMsgMap);
    				msg.setFileSysMsgMap(fileSysMsgMap);
    				
    				ctx.writeAndFlush(msg);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    

    6:HTTP協議處理

    使用Netty服務開發。實現HTTP協議處理邏輯。

    package com.hhxy.netty.http;  
      
    import io.netty.bootstrap.ServerBootstrap;  
    import io.netty.channel.EventLoopGroup;  
    import io.netty.channel.nio.NioEventLoopGroup;  
    import io.netty.channel.socket.nio.NioServerSocketChannel;  
      
    /** 
     * http協議文件傳輸 
     * @author Qixuan.Chen 
     * 創建時間:2015年5月4日 
     */  
    public class HttpStaticFileServer {  
      
          
        private final int port;//端口  
      
        public HttpStaticFileServer(int port) {  
            this.port = port;  
        }  
      
        public void run() throws Exception {  
            EventLoopGroup bossGroup = new NioEventLoopGroup();//線程一 //這個是用于serversocketchannel的event  
            EventLoopGroup workerGroup = new NioEventLoopGroup();//線程二//這個是用于處理accept到的channel  
            try {  
                ServerBootstrap b = new ServerBootstrap();  
                b.group(bossGroup, workerGroup)  
                 .channel(NioServerSocketChannel.class)  
                 .childHandler(new HttpStaticFileServerInitializer());  
      
                b.bind(port).sync().channel().closeFuture().sync();  
            } finally {  
                bossGroup.shutdownGracefully();  
                workerGroup.shutdownGracefully();  
            }  
        }  
      
        public static void main(String[] args) throws Exception {  
            int port = 8089;  
            if (args.length > 0) {  
                port = Integer.parseInt(args[0]);  
            } else {  
                port = 8089;  
            }  
            new HttpStaticFileServer(port).run();//啟動服務  
        }  
    }  
    package com.hhxy.netty.http;
    
    import static io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL;
    import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
    import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpHeaderNames.DATE;
    import static io.netty.handler.codec.http.HttpHeaderNames.EXPIRES;
    import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
    import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
    import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION;
    import static io.netty.handler.codec.http.HttpMethod.GET;
    import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
    import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
    import static io.netty.handler.codec.http.HttpResponseStatus.FOUND;
    import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
    import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
    import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.RandomAccessFile;
    import java.io.UnsupportedEncodingException;
    import java.net.URLDecoder;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.GregorianCalendar;
    import java.util.Locale;
    import java.util.TimeZone;
    import java.util.regex.Pattern;
    
    import javax.activation.MimetypesFileTypeMap;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelProgressiveFuture;
    import io.netty.channel.ChannelProgressiveFutureListener;
    import io.netty.channel.DefaultFileRegion;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.DefaultHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpHeaderUtil;
    import io.netty.handler.codec.http.HttpHeaderValues;
    import io.netty.handler.codec.http.HttpResponse;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.LastHttpContent;
    import io.netty.handler.stream.ChunkedFile;
    import io.netty.util.CharsetUtil;  
      
    /** 
     * A simple handler that serves incoming HTTP requests to send their respective 
     * HTTP responses.  It also implements {@code 'If-Modified-Since'} header to 
     * take advantage of browser cache, as described in 
     * <a href="http://tools.ietf.org/html/rfc2616#section-14.25">RFC 2616</a>. 
     * 
     * <h3>How Browser Caching Works</h3> 
     * 
     * Web browser caching works with HTTP headers as illustrated by the following 
     * sample: 
     * <ol> 
     * <li>Request #1 returns the content of {@code /file1.txt}.</li> 
     * <li>Contents of {@code /file1.txt} is cached by the browser.</li> 
     * <li>Request #2 for {@code /file1.txt} does return the contents of the 
     *     file again. Rather, a 304 Not Modified is returned. This tells the 
     *     browser to use the contents stored in its cache.</li> 
     * <li>The server knows the file has not been modified because the 
     *     {@code If-Modified-Since} date is the same as the file's last 
     *     modified date.</li> 
     * </ol> 
     * 
     * <pre> 
     * Request #1 Headers 
     * =================== 
     * GET /file1.txt HTTP/1.1 
     * 
     * Response #1 Headers 
     * =================== 
     * HTTP/1.1 200 OK 
     * Date:               Tue, 01 Mar 2011 22:44:26 GMT 
     * Last-Modified:      Wed, 30 Jun 2010 21:36:48 GMT 
     * Expires:            Tue, 01 Mar 2012 22:44:26 GMT 
     * Cache-Control:      private, max-age=31536000 
     * 
     * Request #2 Headers 
     * =================== 
     * GET /file1.txt HTTP/1.1 
     * If-Modified-Since:  Wed, 30 Jun 2010 21:36:48 GMT 
     * 
     * Response #2 Headers 
     * =================== 
     * HTTP/1.1 304 Not Modified 
     * Date:               Tue, 01 Mar 2011 22:44:28 GMT 
     * 
     * </pre> 
     */  
    public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {  
      
        public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";  
        public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";  
        public static final int HTTP_CACHE_SECONDS = 60;  
      
        private final boolean useSendFile;  
      
        public HttpStaticFileServerHandler(boolean useSendFile) {  
            this.useSendFile = useSendFile;  
        }  
      
        /**
         * 類似channelRead方法。
         */
        @Override  
        public void messageReceived(  
                ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {  
            if (!request.decoderResult().isSuccess()) {  
                sendError(ctx, BAD_REQUEST);  
                return;  
            }  
      
            if (request.method() != GET) {  
                sendError(ctx, METHOD_NOT_ALLOWED);  
                return;  
            }  
      
            final String uri = request.uri();  
            System.out.println("-----uri----"+uri);  
            final String path = sanitizeUri(uri);  
            System.out.println("-----path----"+path);  
            if (path == null) {  
                sendError(ctx, FORBIDDEN);  
                return;  
            }  
      
            File file = new File(path);  
            if (file.isHidden() || !file.exists()) {  
                sendError(ctx, NOT_FOUND);  
                return;  
            }  
      
            if (file.isDirectory()) {  
                if (uri.endsWith("/")) {  
                    sendListing(ctx, file);  
                } else {  
                    sendRedirect(ctx, uri + '/');  
                }  
                return;  
            }  
      
            if (!file.isFile()) {  
                sendError(ctx, FORBIDDEN);  
                return;  
            }  
      
            // Cache Validation  
            String ifModifiedSince = (String) request.headers().get(IF_MODIFIED_SINCE);  
            if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {  
                SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);  
                Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);  
      
                // Only compare up to the second because the datetime format we send to the client  
                // does not have milliseconds  
                long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;  
                long fileLastModifiedSeconds = file.lastModified() / 1000;  
                if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {  
                    sendNotModified(ctx);  
                    return;  
                }  
            }  
      
            RandomAccessFile raf;  
            try {  
                raf = new RandomAccessFile(file, "r");  
            } catch (FileNotFoundException fnfe) {  
                sendError(ctx, NOT_FOUND);  
                return;  
            }  
            long fileLength = raf.length();  
      
            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);  
            //setContentLength(response, fileLength);  
            HttpHeaderUtil.setContentLength(response, fileLength);
            setContentTypeHeader(response, file);  
            setDateAndCacheHeaders(response, file);  
            if (HttpHeaderUtil.isKeepAlive(request)) {  
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);  
            }  
      
            // Write the initial line and the header.  
            ctx.write(response);  
      
            // Write the content.  
            ChannelFuture sendFileFuture;  
            if (useSendFile) {  
                sendFileFuture =  
                        ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());  
            } else {  
                sendFileFuture =  
                        ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise());  
            }  
      
            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {  
                @Override  
                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {  
                    if (total < 0) { // total unknown  
                        System.err.println("Transfer progress: " + progress);  
                    } else {  
                        System.err.println("Transfer progress: " + progress + " / " + total);  
                    }  
                }  
      
                @Override  
                public void operationComplete(ChannelProgressiveFuture future) throws Exception {  
                    System.err.println("Transfer complete.");  
                }  
            });  
      
            // Write the end marker  
            ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);  
      
            // Decide whether to close the connection or not.  
            if (!HttpHeaderUtil.isKeepAlive(request)) {  
                // Close the connection when the whole content is written out.  
                lastContentFuture.addListener(ChannelFutureListener.CLOSE);  
            }  
        }  
      
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
            cause.printStackTrace();  
            if (ctx.channel().isActive()) {  
                sendError(ctx, INTERNAL_SERVER_ERROR);  
            }  
        }  
      
        private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");  
      
        /** 
         * 路徑解碼 
         * @param uri 
         * @return 
         */  
        private static String sanitizeUri(String uri) {  
            // Decode the path.  
            try {  
                uri = URLDecoder.decode(uri, "UTF-8");  
            } catch (UnsupportedEncodingException e) {  
                try {  
                    uri = URLDecoder.decode(uri, "ISO-8859-1");  
                } catch (UnsupportedEncodingException e1) {  
                    throw new Error();  
                }  
            }  
      
            if (!uri.startsWith("/")) {  
                return null;  
            }  
      
            // Convert file separators.  
            uri = uri.replace('/', File.separatorChar);  
      
            // Simplistic dumb security check.  
            // You will have to do something serious in the production environment.  
            if (uri.contains(File.separator + '.') ||  
                uri.contains('.' + File.separator) ||  
                uri.startsWith(".") || uri.endsWith(".") ||  
                INSECURE_URI.matcher(uri).matches()) {  
                return null;  
            }  
      
            // Convert to absolute path.  
            return System.getProperty("user.dir") + File.separator + uri;  
        }  
      
        private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");  
      
        private static void sendListing(ChannelHandlerContext ctx, File dir) {  
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);  
            response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");  
      
            StringBuilder buf = new StringBuilder();  
            String dirPath = dir.getPath();  
      
            buf.append("<!DOCTYPE html>\r\n");  
            buf.append("<html><head><title>");  
            buf.append("Listing of: ");  
            buf.append(dirPath);  
            buf.append("</title></head><body>\r\n");  
      
            buf.append("<h3>Listing of: ");  
            buf.append(dirPath);  
            buf.append("</h3>\r\n");  
      
            buf.append("<ul>");  
            buf.append("<li><a href=\"../\">..</a></li>\r\n");  
      
            for (File f: dir.listFiles()) {  
                if (f.isHidden() || !f.canRead()) {  
                    continue;  
                }  
      
                String name = f.getName();  
                if (!ALLOWED_FILE_NAME.matcher(name).matches()) {  
                    continue;  
                }  
      
                buf.append("<li><a href=\"");  
                buf.append(name);  
                buf.append("\">");  
                buf.append(name);  
                buf.append("</a></li>\r\n");  
            }  
      
            buf.append("</ul></body></html>\r\n");  
            ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);  
            response.content().writeBytes(buffer);  
            buffer.release();  
      
            // Close the connection as soon as the error message is sent.  
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
        }  
      
        private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {  
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);  
            response.headers().set(LOCATION, newUri);  
      
            // Close the connection as soon as the error message is sent.  
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
        }  
      
        private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {  
            FullHttpResponse response = new DefaultFullHttpResponse(  
                    HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));  
            response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");  
      
            // Close the connection as soon as the error message is sent.  
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
        }  
      
        /** 
         * When file timestamp is the same as what the browser is sending up, send a "304 Not Modified" 
         * 
         * @param ctx 
         *            Context 
         */  
        private static void sendNotModified(ChannelHandlerContext ctx) {  
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);  
            setDateHeader(response);  
      
            // Close the connection as soon as the error message is sent.  
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
        }  
      
        /** 
         * Sets the Date header for the HTTP response 
         * 
         * @param response 
         *            HTTP response 
         */  
        private static void setDateHeader(FullHttpResponse response) {  
            SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);  
            dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));  
      
            Calendar time = new GregorianCalendar();  
            response.headers().set(DATE, dateFormatter.format(time.getTime()));  
        }  
      
        /** 
         * Sets the Date and Cache headers for the HTTP Response 
         * 
         * @param response 
         *            HTTP response 
         * @param fileToCache 
         *            file to extract content type 
         */  
        private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {  
            SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);  
            dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));  
      
            // Date header  
            Calendar time = new GregorianCalendar();  
            response.headers().set(DATE, dateFormatter.format(time.getTime()));  
      
            // Add cache headers  
            time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);  
            response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));  
            response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);  
            response.headers().set(  
                    LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));  
        }  
      
        /** 
         * Sets the content type header for the HTTP Response 
         * 
         * @param response 
         *            HTTP response 
         * @param file 
         *            file to extract content type 
         */  
        private static void setContentTypeHeader(HttpResponse response, File file) {  
            MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();  
            response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));  
        }  
      
    }  
    package com.hhxy.netty.http;
    
    import io.netty.channel.ChannelInitializer;  
    import io.netty.channel.ChannelPipeline;  
    import io.netty.channel.socket.SocketChannel;  
    import io.netty.handler.codec.http.HttpObjectAggregator;  
    import io.netty.handler.codec.http.HttpRequestDecoder;  
    import io.netty.handler.codec.http.HttpResponseEncoder;  
    import io.netty.handler.stream.ChunkedWriteHandler;  
      
    public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {  
        @Override  
        public void initChannel(SocketChannel ch) throws Exception {  
            // Create a default pipeline implementation.  
            ChannelPipeline pipeline = ch.pipeline();  
      
            // Uncomment the following line if you want HTTPS  
            //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();  
            //engine.setUseClientMode(false);  
            //pipeline.addLast("ssl", new SslHandler(engine));  
           /** 
            *   (1)ReadTimeoutHandler,用于控制讀取數據的時候的超時,10表示如果10秒鐘都沒有數據讀取了,那么就引發超時,然后關閉當前的channel 
     
                (2)WriteTimeoutHandler,用于控制數據輸出的時候的超時,構造參數1表示如果持續1秒鐘都沒有數據寫了,那么就超時。 
                 
                (3)HttpRequestrianDecoder,這個handler用于從讀取的數據中將http報文信息解析出來,無非就是什么requestline,header,body什么的。。。 
                 
                (4)然后HttpObjectAggregator則是用于將上賣解析出來的http報文的數據組裝成為封裝好的httprequest對象。。 
                 
                (5)HttpresponseEncoder,用于將用戶返回的httpresponse編碼成為http報文格式的數據 
                 
                (6)HttpHandler,自定義的handler,用于處理接收到的http請求。 
            */  
              
            pipeline.addLast("decoder", new HttpRequestDecoder());// http-request解碼器,http服務器端對request解碼  
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//對傳輸文件大少進行限制  
            pipeline.addLast("encoder", new HttpResponseEncoder());//http-response解碼器,http服務器端對response編碼  
            // 向客戶端發送數據的一個Handler
            pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());  
      
            pipeline.addLast("handler", new HttpStaticFileServerHandler(true)); // Specify false if SSL.(如果是ssl,就指定為false)  
        }  
    }  

    7:其他輔助代碼

    package com.hhxy.utils;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.util.zip.GZIPInputStream;
    import java.util.zip.GZIPOutputStream;
    
    public class GzipUtils {
    	
    	public static void main(String[] args) throws Exception {
    		FileInputStream fis = new FileInputStream("D:\\3\\1.jpg");
    		byte[] temp = new byte[fis.available()];
    		int length = fis.read(temp);
    		System.out.println("長度 : " + length);
    		
    		byte[] zipArray = GzipUtils.zip(temp);
    		System.out.println("壓縮后的長度 : " + zipArray.length);
    		
    		byte[] unzipArray = GzipUtils.unzip(zipArray);
    		System.out.println("解壓縮后的長度 : " + unzipArray.length);
    		
    		FileOutputStream fos = new FileOutputStream("D:\\3\\101.jpg");
    		fos.write(unzipArray);
    		fos.flush();
    		
    		fos.close();
    		fis.close();
    	}
    	
    	/**
    	 * 解壓縮
    	 * @param source 源數據。需要解壓的數據。
    	 * @return 解壓后的數據。 恢復的數據。
    	 * @throws Exception
    	 */
    	public static byte[] unzip(byte[] source) throws Exception{
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		ByteArrayInputStream in = new ByteArrayInputStream(source);
    		// JDK提供的。 專門用于壓縮使用的流對象。可以處理字節數組數據。
    		GZIPInputStream zipIn = new GZIPInputStream(in);
    		byte[] temp = new byte[256];
    		int length = 0;
    		while((length = zipIn.read(temp, 0, temp.length)) != -1){
    			out.write(temp, 0, length);
    		}
    		// 將字節數組輸出流中的數據,轉換為一個字節數組。
    		byte[] target = out.toByteArray();
    		
    		zipIn.close();
    		out.close();
    		
    		return target;
    	}
    	
    	/**
    	 * 壓縮
    	 * @param source 源數據,需要壓縮的數據
    	 * @return 壓縮后的數據。
    	 * @throws Exception
    	 */
    	public static byte[] zip(byte[] source) throws Exception{
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		// 輸出流,JDK提供的,提供解壓縮功能。
    		GZIPOutputStream zipOut = new GZIPOutputStream(out);
    		// 將壓縮信息寫入到內存。 寫入的過程會實現解壓。
    		zipOut.write(source);
    		// 結束。
    		zipOut.finish();
    		byte[] target = out.toByteArray();
    		
    		zipOut.close();
    		
    		return target;
    	}
    }
    
    package com.hhxy.utils;
    
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.Map;
    import java.util.Properties;
    
    import org.hyperic.sigar.CpuInfo;
    import org.hyperic.sigar.CpuPerc;
    import org.hyperic.sigar.FileSystem;
    import org.hyperic.sigar.FileSystemUsage;
    import org.hyperic.sigar.Mem;
    import org.hyperic.sigar.NetFlags;
    import org.hyperic.sigar.NetInterfaceConfig;
    import org.hyperic.sigar.NetInterfaceStat;
    import org.hyperic.sigar.OperatingSystem;
    import org.hyperic.sigar.Sigar;
    import org.hyperic.sigar.SigarException;
    import org.hyperic.sigar.Swap;
    import org.hyperic.sigar.Who;
    
    
    public class OSUtils {
    
    
        public static void main(String[] args) {
            try {
                // System信息,從jvm獲取
                property();
                System.out.println("----------------------------------");
                // cpu信息
                cpu();
                System.out.println("----------------------------------");
                // 內存信息
                memory();
                System.out.println("----------------------------------");
                // 操作系統信息
                os();
                System.out.println("----------------------------------");
                // 用戶信息
                who();
                System.out.println("----------------------------------");
                // 文件系統信息
                file();
                System.out.println("----------------------------------");
                // 網絡信息
                net();
                System.out.println("----------------------------------");
                // 以太網信息
                ethernet();
                System.out.println("----------------------------------");
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    
        private static void property() throws UnknownHostException {
            Runtime r = Runtime.getRuntime();
            Properties props = System.getProperties();
            InetAddress addr;
            addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
            Map<String, String> map = System.getenv();
            String userName = map.get("USERNAME");// 獲取用戶名
            String computerName = map.get("COMPUTERNAME");// 獲取計算機名
            String userDomain = map.get("USERDOMAIN");// 獲取計算機域名
            System.out.println("用戶名:    " + userName);
            System.out.println("計算機名:    " + computerName);
            System.out.println("計算機域名:    " + userDomain);
            System.out.println("本地ip地址:    " + ip);
            System.out.println("本地主機名:    " + addr.getHostName());
            System.out.println("JVM可以使用的總內存:    " + r.totalMemory());
            System.out.println("JVM可以使用的剩余內存:    " + r.freeMemory());
            System.out.println("JVM可以使用的處理器個數:    " + r.availableProcessors());
            System.out.println("Java的運行環境版本:    " + props.getProperty("java.version"));
            System.out.println("Java的運行環境供應商:    " + props.getProperty("java.vendor"));
            System.out.println("Java供應商的URL:    " + props.getProperty("java.vendor.url"));
            System.out.println("Java的安裝路徑:    " + props.getProperty("java.home"));
            System.out.println("Java的虛擬機規范版本:    " + props.getProperty("java.vm.specification.version"));
            System.out.println("Java的虛擬機規范供應商:    " + props.getProperty("java.vm.specification.vendor"));
            System.out.println("Java的虛擬機規范名稱:    " + props.getProperty("java.vm.specification.name"));
            System.out.println("Java的虛擬機實現版本:    " + props.getProperty("java.vm.version"));
            System.out.println("Java的虛擬機實現供應商:    " + props.getProperty("java.vm.vendor"));
            System.out.println("Java的虛擬機實現名稱:    " + props.getProperty("java.vm.name"));
            System.out.println("Java運行時環境規范版本:    " + props.getProperty("java.specification.version"));
            System.out.println("Java運行時環境規范供應商:    " + props.getProperty("java.specification.vender"));
            System.out.println("Java運行時環境規范名稱:    " + props.getProperty("java.specification.name"));
            System.out.println("Java的類格式版本號:    " + props.getProperty("java.class.version"));
            System.out.println("Java的類路徑:    " + props.getProperty("java.class.path"));
            System.out.println("加載庫時搜索的路徑列表:    " + props.getProperty("java.library.path"));
            System.out.println("默認的臨時文件路徑:    " + props.getProperty("java.io.tmpdir"));
            System.out.println("一個或多個擴展目錄的路徑:    " + props.getProperty("java.ext.dirs"));
            System.out.println("操作系統的名稱:    " + props.getProperty("os.name"));
            System.out.println("操作系統的構架:    " + props.getProperty("os.arch"));
            System.out.println("操作系統的版本:    " + props.getProperty("os.version"));
            System.out.println("文件分隔符:    " + props.getProperty("file.separator"));
            System.out.println("路徑分隔符:    " + props.getProperty("path.separator"));
            System.out.println("行分隔符:    " + props.getProperty("line.separator"));
            System.out.println("用戶的賬戶名稱:    " + props.getProperty("user.name"));
            System.out.println("用戶的主目錄:    " + props.getProperty("user.home"));
            System.out.println("用戶的當前工作目錄:    " + props.getProperty("user.dir"));
        }
    
        private static void memory() throws SigarException {
            Sigar sigar = new Sigar();
            Mem mem = sigar.getMem();
            // 內存總量
            System.out.println("內存總量:    " + mem.getTotal() / 1024L + "K av");
            // 當前內存使用量
            System.out.println("當前內存使用量:    " + mem.getUsed() / 1024L + "K used");
            // 當前內存剩余量
            System.out.println("當前內存剩余量:    " + mem.getFree() / 1024L + "K free");
            Swap swap = sigar.getSwap();
            // 交換區總量
            System.out.println("交換區總量:    " + swap.getTotal() / 1024L + "K av");
            // 當前交換區使用量
            System.out.println("當前交換區使用量:    " + swap.getUsed() / 1024L + "K used");
            // 當前交換區剩余量
            System.out.println("當前交換區剩余量:    " + swap.getFree() / 1024L + "K free");
        }
    
        private static void cpu() throws SigarException {
            Sigar sigar = new Sigar();
            CpuInfo infos[] = sigar.getCpuInfoList();
            CpuPerc cpuList[] = null;
            cpuList = sigar.getCpuPercList();
            for (int i = 0; i < infos.length; i++) {// 不管是單塊CPU還是多CPU都適用
                CpuInfo info = infos[i];
                System.out.println("第" + (i + 1) + "塊CPU信息");
                System.out.println("CPU的總量MHz:    " + info.getMhz());// CPU的總量MHz
                System.out.println("CPU生產商:    " + info.getVendor());// 獲得CPU的賣主,如:Intel
                System.out.println("CPU類別:    " + info.getModel());// 獲得CPU的類別,如:Celeron
                System.out.println("CPU緩存數量:    " + info.getCacheSize());// 緩沖存儲器數量
                printCpuPerc(cpuList[i]);
            }
        }
    
        private static void printCpuPerc(CpuPerc cpu) {
            System.out.println("CPU用戶使用率:    " + CpuPerc.format(cpu.getUser()));// 用戶使用率
            System.out.println("CPU系統使用率:    " + CpuPerc.format(cpu.getSys()));// 系統使用率
            System.out.println("CPU當前等待率:    " + CpuPerc.format(cpu.getWait()));// 當前等待率
            System.out.println("CPU當前錯誤率:    " + CpuPerc.format(cpu.getNice()));//
            System.out.println("CPU當前空閑率:    " + CpuPerc.format(cpu.getIdle()));// 當前空閑率
            System.out.println("CPU總的使用率:    " + CpuPerc.format(cpu.getCombined()));// 總的使用率
        }
    
        private static void os() {
            OperatingSystem OS = OperatingSystem.getInstance();
            // 操作系統內核類型如: 386、486、586等x86
            System.out.println("操作系統:    " + OS.getArch());
            System.out.println("操作系統CpuEndian():    " + OS.getCpuEndian());//
            System.out.println("操作系統DataModel():    " + OS.getDataModel());//
            // 系統描述
            System.out.println("操作系統的描述:    " + OS.getDescription());
            // 操作系統類型
            // System.out.println("OS.getName():    " + OS.getName());
            // System.out.println("OS.getPatchLevel():    " + OS.getPatchLevel());//
            // 操作系統的賣主
            System.out.println("操作系統的賣主:    " + OS.getVendor());
            // 賣主名稱
            System.out.println("操作系統的賣主名:    " + OS.getVendorCodeName());
            // 操作系統名稱
            System.out.println("操作系統名稱:    " + OS.getVendorName());
            // 操作系統賣主類型
            System.out.println("操作系統賣主類型:    " + OS.getVendorVersion());
            // 操作系統的版本號
            System.out.println("操作系統的版本號:    " + OS.getVersion());
        }
    
        private static void who() throws SigarException {
            Sigar sigar = new Sigar();
            Who who[] = sigar.getWhoList();
            if (who != null && who.length > 0) {
                for (int i = 0; i < who.length; i++) {
                    // System.out.println("當前系統進程表中的用戶名" + String.valueOf(i));
                    Who _who = who[i];
                    System.out.println("用戶控制臺:    " + _who.getDevice());
                    System.out.println("用戶host:    " + _who.getHost());
                    // System.out.println("getTime():    " + _who.getTime());
                    // 當前系統進程表中的用戶名
                    System.out.println("當前系統進程表中的用戶名:    " + _who.getUser());
                }
            }
        }
    
        private static void file() throws Exception {
            Sigar sigar = new Sigar();
            FileSystem fslist[] = sigar.getFileSystemList();
            try {
                for (int i = 0; i < fslist.length; i++) {
                    System.out.println("分區的盤符名稱" + i);
                    FileSystem fs = fslist[i];
                    // 分區的盤符名稱
                    System.out.println("盤符名稱:    " + fs.getDevName());
                    // 分區的盤符名稱
                    System.out.println("盤符路徑:    " + fs.getDirName());
                    System.out.println("盤符標志:    " + fs.getFlags());//
                    // 文件系統類型,比如 FAT32、NTFS
                    System.out.println("盤符類型:    " + fs.getSysTypeName());
                    // 文件系統類型名,比如本地硬盤、光驅、網絡文件系統等
                    System.out.println("盤符類型名:    " + fs.getTypeName());
                    // 文件系統類型
                    System.out.println("盤符文件系統類型:    " + fs.getType());
                    FileSystemUsage usage = null;
                    usage = sigar.getFileSystemUsage(fs.getDirName());
                    switch (fs.getType()) {
                        case 0: // TYPE_UNKNOWN :未知
                            break;
                        case 1: // TYPE_NONE
                            break;
                        case 2: // TYPE_LOCAL_DISK : 本地硬盤
                            // 文件系統總大小
                            System.out.println(fs.getDevName() + "總大小:    " + usage.getTotal() + "KB");
                            // 文件系統剩余大小
                            System.out.println(fs.getDevName() + "剩余大小:    " + usage.getFree() + "KB");
                            // 文件系統可用大小
                            System.out.println(fs.getDevName() + "可用大小:    " + usage.getAvail() + "KB");
                            // 文件系統已經使用量
                            System.out.println(fs.getDevName() + "已經使用量:    " + usage.getUsed() + "KB");
                            double usePercent = usage.getUsePercent() * 100D;
                            // 文件系統資源的利用率
                            System.out.println(fs.getDevName() + "資源的利用率:    " + usePercent + "%");
                            break;
                        case 3:// TYPE_NETWORK :網絡
                            break;
                        case 4:// TYPE_RAM_DISK :閃存
                            break;
                        case 5:// TYPE_CDROM :光驅
                            break;
                        case 6:// TYPE_SWAP :頁面交換
                            break;
                    }
                    System.out.println(fs.getDevName() + "讀出:    " + usage.getDiskReads());
                    System.out.println(fs.getDevName() + "寫入:    " + usage.getDiskWrites());
                }
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            }
    
            return;
        }
    
        private static void net() throws Exception {
            Sigar sigar = new Sigar();
            String ifNames[] = sigar.getNetInterfaceList();
            for (int i = 0; i < ifNames.length; i++) {
                String name = ifNames[i];
                NetInterfaceConfig ifconfig = sigar.getNetInterfaceConfig(name);
                System.out.println("網絡設備名:    " + name);// 網絡設備名
                System.out.println("IP地址:    " + ifconfig.getAddress());// IP地址
                System.out.println("子網掩碼:    " + ifconfig.getNetmask());// 子網掩碼
                if ((ifconfig.getFlags() & 1L) <= 0L) {
                    System.out.println("!IFF_UP...skipping getNetInterfaceStat");
                    continue;
                }
                NetInterfaceStat ifstat = sigar.getNetInterfaceStat(name);
                System.out.println(name + "接收的總包裹數:" + ifstat.getRxPackets());// 接收的總包裹數
                System.out.println(name + "發送的總包裹數:" + ifstat.getTxPackets());// 發送的總包裹數
                System.out.println(name + "接收到的總字節數:" + ifstat.getRxBytes());// 接收到的總字節數
                System.out.println(name + "發送的總字節數:" + ifstat.getTxBytes());// 發送的總字節數
                System.out.println(name + "接收到的錯誤包數:" + ifstat.getRxErrors());// 接收到的錯誤包數
                System.out.println(name + "發送數據包時的錯誤數:" + ifstat.getTxErrors());// 發送數據包時的錯誤數
                System.out.println(name + "接收時丟棄的包數:" + ifstat.getRxDropped());// 接收時丟棄的包數
                System.out.println(name + "發送時丟棄的包數:" + ifstat.getTxDropped());// 發送時丟棄的包數
            }
        }
    
        private static void ethernet() throws SigarException {
            Sigar sigar = null;
            sigar = new Sigar();
            String[] ifaces = sigar.getNetInterfaceList();
            for (int i = 0; i < ifaces.length; i++) {
                NetInterfaceConfig cfg = sigar.getNetInterfaceConfig(ifaces[i]);
                if (NetFlags.LOOPBACK_ADDRESS.equals(cfg.getAddress()) || (cfg.getFlags() & NetFlags.IFF_LOOPBACK) != 0
                        || NetFlags.NULL_HWADDR.equals(cfg.getHwaddr())) {
                    continue;
                }
                System.out.println(cfg.getName() + "IP地址:" + cfg.getAddress());// IP地址
                System.out.println(cfg.getName() + "網關廣播地址:" + cfg.getBroadcast());// 網關廣播地址
                System.out.println(cfg.getName() + "網卡MAC地址:" + cfg.getHwaddr());// 網卡MAC地址
                System.out.println(cfg.getName() + "子網掩碼:" + cfg.getNetmask());// 子網掩碼
                System.out.println(cfg.getName() + "網卡描述信息:" + cfg.getDescription());// 網卡描述信息
                System.out.println(cfg.getName() + "網卡類型" + cfg.getType());//
            }
        }
    }
    package com.hhxy.utils;
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    public class SerializableFactory4Marshalling {
    
    	 /**
         * 創建Jboss Marshalling解碼器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            //首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。
        	//jboss-marshalling-serial 包提供
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //創建了MarshallingConfiguration對象,配置了版本號為5 
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            // 序列化版本。只要使用JDK5以上版本,version只能定義為5。
            configuration.setVersion(5);
            //根據marshallerFactory和configuration創建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        /**
         * 創建Jboss Marshalling編碼器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //構建Netty的MarshallingEncoder對象,MarshallingEncoder用于實現序列化接口的POJO對象序列化為二進制數組
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    	
    }
    
    package com.hhxy.utils;
    
    import java.io.Serializable;
    
    public class RequestMessage implements Serializable {
    	private static final long serialVersionUID = 7084843947860990140L;
    	private Long id;
    	private String message;
    	private byte[] attachment;
    	@Override
    	public String toString() {
    		return "RequestMessage [id=" + id + ", message=" + message + "]";
    	}
    	public RequestMessage() {
    		super();
    	}
    	public RequestMessage(Long id, String message, byte[] attachment) {
    		super();
    		this.id = id;
    		this.message = message;
    		this.attachment = attachment;
    	}
    	public Long getId() {
    		return id;
    	}
    	public void setId(Long id) {
    		this.id = id;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    	public byte[] getAttachment() {
    		return attachment;
    	}
    	public void setAttachment(byte[] attachment) {
    		this.attachment = attachment;
    	}
    }
    
    package com.hhxy.utils;
    
    import java.io.Serializable;
    
    public class ResponseMessage implements Serializable {
    	private static final long serialVersionUID = -8134313953478922076L;
    	private Long id;
    	private String message;
    	@Override
    	public String toString() {
    		return "ResponseMessage [id=" + id + ", message=" + message + "]";
    	}
    	public ResponseMessage() {
    		super();
    	}
    	public ResponseMessage(Long id, String message) {
    		super();
    		this.id = id;
    		this.message = message;
    	}
    	public Long getId() {
    		return id;
    	}
    	public void setId(Long id) {
    		this.id = id;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    }
    
    package com.hhxy.utils;
    
    import java.io.Serializable;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.Map;
    
    public class HeatbeatMessage implements Serializable {
    
    	private static final long serialVersionUID = 2827219147304706826L;
    	private String ip;
    	private Map<String, Object> cpuMsgMap;
    	private Map<String, Object> memMsgMap;
    	private Map<String, Object> fileSysMsgMap;
    	@Override
    	public String toString() {
    		return "HeatbeatMessage [\nip=" + ip 
    				+ ", \ncpuMsgMap=" + cpuMsgMap 
    				+ ", \nmemMsgMap=" + memMsgMap
    				+ ", \nfileSysMsgMap=" + fileSysMsgMap + "]";
    	}
    	
    	public String getIp() {
    		return ip;
    	}
    	public void setIp(String ip) {
    		this.ip = ip;
    	}
    	public Map<String, Object> getCpuMsgMap() {
    		return cpuMsgMap;
    	}
    	public void setCpuMsgMap(Map<String, Object> cpuMsgMap) {
    		this.cpuMsgMap = cpuMsgMap;
    	}
    	public Map<String, Object> getMemMsgMap() {
    		return memMsgMap;
    	}
    	public void setMemMsgMap(Map<String, Object> memMsgMap) {
    		this.memMsgMap = memMsgMap;
    	}
    	public Map<String, Object> getFileSysMsgMap() {
    		return fileSysMsgMap;
    	}
    	public void setFileSysMsgMap(Map<String, Object> fileSysMsgMap) {
    		this.fileSysMsgMap = fileSysMsgMap;
    	}
    	
    }
    

    五:流數據的傳輸處理

           在基于流的傳輸里比如TCP/IP,接收到的數據會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數據包隊列,而是一個字節隊列。即使你發送了2個獨立的數據包,操作系統也不會作為2個消息處理而僅僅是作為一連串的字節而言。因此這是不能保證你遠程寫入的數據就會準確地讀取。所以一個接收方不管他是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意思并且能夠讓程序的業務邏輯更好理解的數據。

           在處理流數據粘包拆包時,可以使用下述處理方式:

           使用定長數據處理,如:每個完整請求數據長度為8字節等。(FixedLengthFrameDecoder)

           使用特殊分隔符的方式處理,如:每個完整請求數據末尾使用’\0’作為數據結束標記。(DelimiterBasedFrameDecoder)

           使用自定義協議方式處理,如:http協議格式等。

           使用POJO來替代傳遞的流數據,如:每個完整的請求數據都是一個RequestMessage對象,在Java語言中,使用POJO更符合語種特性,推薦使用。

    版權聲明:本文為qq_36297434原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/qq_36297434/article/details/103225460

    智能推薦

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    styled-components —— React 中的 CSS 最佳實踐

    https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...

    猜你喜歡

    基于TCP/IP的網絡聊天室用Java來實現

    基于TCP/IP的網絡聊天室實現 開發工具:eclipse 開發環境:jdk1.8 發送端 接收端 工具類 運行截圖...

    19.vue中封裝echarts組件

    19.vue中封裝echarts組件 1.效果圖 2.echarts組件 3.使用組件 按照組件格式整理好數據格式 傳入組件 home.vue 4.接口返回數據格式...

    劍指Offer39-調整數組順序使奇數位于偶數前面

    一開始想著用冒泡排序的方法來做,但是bug還是很多,后來看了評論區答案,發現直接空間換時間是最簡單的,而且和快排的寫法是類似的。...

    【一只蒟蒻的刷題歷程】【藍橋杯】歷屆試題 九宮重排 (八數碼問題:BFS+集合set)

    資源限制 時間限制:1.0s 內存限制:256.0MB 問題描述 如下面第一個圖的九宮格中,放著 1~8 的數字卡片,還有一個格子空著。與空格子相鄰的格子中的卡片可以移動到空格中。經過若干次移動,可以形成第二個圖所示的局面。 我們把第一個圖的局面記為:12345678. 把第二個圖的局面記為:123.46758 顯然是按從上到下,從左到右的順序記錄數字,空格記為句點。 本題目的任務是已知九宮的初態...

    dataV組件容器寬高發生變化后,組件不會自適應解決方法

    項目中需要大屏幕數據展示,于是使用了dataV組件,但是使用是發現拖動瀏覽器邊框,dataV組件顯示異常,如圖: 于是查了官網,官網的解釋如下:   于是按照官網的意思編寫代碼: 于是可以自適應了...

    精品国产乱码久久久久久蜜桃不卡