Netty學習筆記(三):Netty簡介、線程模型、Netty應用實例、Netty核心組件介紹
第 4 章 Netty 詳解
一、Netty簡介
1、NIO 存在的問題
- NIO 的類庫和 API 繁雜,使用麻煩:需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要具備其他的額外技能:要熟悉 Java 多線程編程,因為 NIO 編程涉及到 Reactor 模式,必須對多線程 和網絡編程非常熟悉,才能編寫出高質量的 NIO 程序。
- 開發工作量和難度都非常大:例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常流
的處理等等。 - JDKNIO 的 Bug:例如臭名昭著的 EpollBug,它會導致 Selector 空輪詢,最終導致 CPU100%。直到 JDK1.7
版本該問題仍舊存在,沒有被根本解決。
就因為上述原因,所以JBoss基于NIO開發了Netty。
2、Netty介紹
- Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基于事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序
- Netty 可以幫助你快速、簡單的開發出一個網絡應用,相當于簡化和流程化了 NIO 的開發過程
- Netty 是目前最流行的 NIO 框架,Netty 在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,知名的 Elasticsearch 、Dubbo 框架內部都采用了 Netty。
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients
3、Netty的優點
Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述NIO所帶的問題。
- 設計優雅:適用于各種傳輸類型的統一 API 阻塞和非阻塞 Socket;基于靈活且可擴展的事件模型,可以清晰地分離關注點;高度可定制的線程模型 - 單線程,一個或多個線程池.
- 使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其他依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。
- 高性能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的內存復制。
- 安全:完整的 SSL/TLS 和 StartTLS 支持。
- 社區活躍、不斷更新:社區活躍,版本迭代周期短,發現的 Bug 可以被及時修復,同時,更多的新功能會被加入
4、Netty版本說明
- netty版本分為 netty3.x 和 netty4.x、netty5.x
- 因為Netty5出現重大bug,已經被官網廢棄了,目前推薦使用的是Netty4.x的穩定版本
- netty 下載地址: https://bintray.com/netty/downloads/netty/
二、線程模型介紹
1、線程模型基本介紹
不同的線程模式,對程序的性能有很大影響,為了搞清 Netty 線程模式,我們來系統的講解下 各個線程模式
目前存在的線程模型有:
- 傳統阻塞 I/O 服務模型 。
- Reactor 模式 。根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現
- 單 Reactor 單線程;
- 單 Reactor 多線程;
- 主從 Reactor 多線程
Netty 主要基于主從 Reactor 多線程模型做了一定的改進,其中主從 Reactor 多線程模型有多 個 Reactor
2、傳統阻塞 I/O 服務模型
傳統阻塞 I/O 服務模型工作原理如下圖所示:
其中:黃色的框表示對象, 藍色的框表示線程,白色的框表示方法(API)
傳統阻塞 I/O 服務模型特點:
- 采用阻塞 IO 模式獲取輸入的數據
- 每個連接都需要獨立的線程完成數據的輸入,業務處理, 數據返回
傳統阻塞 I/O 服務模型存在的問題:
- 當并發數很大,就會創建大量的線程,占用很大系統資源
- 連接創建后,如果當前線程暫時沒有數據可讀,該線程會阻塞在 read 操作,造成線程資源浪費3、
3、 Reactor 模式簡介
Reactor 模式有多種翻譯:1. 反應器模式 2. 分發者模式(Dispatcher)3. 通知者模式(notifier)
針對上面所述的傳統阻塞 I/O 服務模型的 2 個缺點,Reactor提出了下面的解決方案:
- 基于 I/O 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象等待,無需阻塞等待所有連
接。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理 - 基于線程池復用線程資源:不必再為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理
I/O復用結合線程池,就是 Reactor 模式基本設計思想,如圖:
說明:
- 通過一個或多個輸入同時傳遞給服務處理器(ServiceHandler)的模式(基于事件驅動)
- 服務器端程序處理傳入的多個請求,并將它們同步分派到相應的處理線程, 因此 Reactor 模式也叫 Dispatcher
模式 - Reactor 模式使用IO復用監聽事件, 收到事件后,分發給某個線程(進程), 這點就是網絡服務器高并發處理關鍵
Reactor 模式中 核心組成:
- Reactor:Reactor (就是服務處理器(ServiceHandler))在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理程序來對 IO 事件做出反應。 它就像公司的電話接線員,它接聽來自客戶的電話并將線路轉移到適當的聯系人;
- Handlers:Handlers(就是事件處理器(EventHandler)),處理程序執行 I/O 事件要完成的實際事件,類似于客戶想要與之交談的公司中的實際官員。Reactor 通過調度適當的處理程序來響應 I/O 事件,處理程序執行非阻塞操作。
根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現,下面將分別進行介紹:
- 單 Reactor 單線程
- 單 Reactor 多線程
- 主從 Reactor 多線程
Reactor 模式具有如下的優點:
- 響應快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的
- 可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷
- 擴展性好,可以方便的通過增加 Reactor 實例個數來充分利用 CPU 資源
- 復用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的復用性
4、單 Reactor 單線程模式
原理圖:
原理說明:
- Select 是前面 I/O 復用模型介紹的標準網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求
- Reactor 對象通過 select 監控客戶端請求事件, 收到事件后,通過 dispatch 進行分發
- 如果建立連接請求, 則右 Acceptor 通過 accept 處理連接請求, 然后創建一個 Handler 對象處理完成連接后的各種事件
- 如果不是連接請求,則由 reactor 分發調用連接對應的 handler 來處理
- Handler 會完成 Read→業務處理→Send 的完整業務流程
前面的 NIO 群聊系統就是單 Reactor 單線程模式
方案優缺點分析:
- 優點:模型簡單,沒有多線程、進程通信、競爭的問題,全部都在一個線程中完成
- 缺點:性能問題,只有一個線程,無法完全發揮多核 CPU 的性能。Handler 在處理某個連接上的業務時,整
個進程無法處理其他連接事件,很容易導致性能瓶頸 - 缺點:可靠性問題,線程意外終止,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部
消息,造成節點故障
使用場景:客戶端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間復雜度 O(1) 的情況
5、單 Reactor 多線程模式
原理圖:
原理說明:
- Reactor 對象通過 select 監控客戶端請求 事件, 收到事件后,通過 dispatch 進行分發
- 如果建立連接請求, 則右 Acceptor 通過 accept 處理連接請求, 然后創建一個 Handler 對象處理完成連接后的各種事件
- 如果不是連接請求,則由 reactor 分發調用連接對應的 handler 來處理
- handler 只負責響應事件,不做具體的業務處理, 通過 read 讀取數據后,會分發給后面的 worker 線程池的某個線程處理業務
- worker 線程池會分配獨立線程完成真正的業務,并將結果返回給 handler
- handler 收到響應后,通過 send 將結果返回給 client
方案優缺點分析:
- 優點:可以充分的利用多核 cpu 的處理能力
- 缺點:多線程數據共享和訪問比較復雜
- 缺點: reactor 處理所有的事件的監聽和響應,在單線程運行, 在高并發場 景容易出現性能瓶頸.
6、主從 Reactor 多線程
針對單 Reactor 多線程模型中,Reactor 在單線程中運行,高并發場景下容易成為性能瓶頸,可以讓 Reactor 在多線程中運行
工作原理圖 :
原理說明:
-
Reactor 主線程 MainReactor 對象通過 select 監聽連接事件, 收到事件后,通過 Acceptor 處理連接事件
-
當 Acceptor 處理連接事件后,MainReactor 將連接分配給 SubReactor(有多個)
-
SubReactor 將連接加入到連接隊列進行監聽,并創建 handler 進行各種事件處理
-
當有新事件發生時, subreactor 就會調用對應的 handler 處理
-
handler 先read 讀取數據,然后分發給后面的 worker 線程池處理
-
worker 線程池分配獨立的 worker 線程進行業務處理,并返回結果
-
handler 收到響應的結果后,再通過 send 將結果返回給 client
-
Reactor 主線程可以對應多個 Reactor 子線程, 即 MainRecator 可以關聯多個 SubReactor
原理圖的另一種表示,結構和上面是類似的:
方案優缺點說明:
- 優點:父線程與子線程的數據交互簡單職責明確,父線程只需要接收新連接,子線程完成后續的業務處理。
- 優點:父線程與子線程的數據交互簡單,Reactor 主線程只需要把新連接傳給子線程,子線程無需返回數據。
- 缺點:編程復雜度較高
這種模型在許多項目中廣泛使用,包括 Nginx 主從 Reactor 多進程模型,Memcached 主從多線程, Netty 主從多線程模型的支持
三、Netty 模型
1、模型原理
Netty 主要基于主從 Reactors 多線程模型(如圖)做了一定的改進,其中主從 Reactor 多線程模型有多個 Reactor
從整體上看,Netty 模型如圖所示:
說明:
- BossGroup 線程維護 Selector, 只關注 Accecpt
- 當接收到 Accept 事件,獲取到對應的 SocketChannel, 封裝成 NIOScoketChannel 并注冊到 Worker 線程(事件循環), 并進行維護
- 當 Worker 線程監聽到 selector 中通道發生自己感興趣的事件后,就進行處理(就由 handler), 注意 handler 已經加入到通道
更加詳細的的模型如下圖所示:
Netty模型原理說明:
-
Netty 抽象出兩組線程池: BossGroup 專門負責接收客戶端的連接、WorkerGroup 專門負責網絡的讀寫
-
BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup 。NioEventLoopGroup 相當于一個事件循環組, 這個組中含有多個事件循環 ,每一個事件循環是 NioEventLoop
-
NioEventLoop 表示一個不斷循環的執行處理任務的線程, 每個 NioEventLoop 都有一個 selector, 用于監聽綁 定在其上的 socket 的網絡通訊
-
每個 BossNioEventLoop 循環執行的步驟有 3 步
- 輪詢 accept 事件
- 處理 accept 事件 , 與 client 建立連接 , 生成 NioScocketChannel , 并將其注冊到某個 worker NIOEventLoop 上 的 selector
- 處理任務隊列的其他任務 , 即 runAllTasks
-
每個 WorkerNIOEventLoop 循環執行的步驟
- 輪詢 read,write 事件
- 在對應 NioScocketChannel上進行 處理 i/o 事件, 即 read,write 事件
- 處理任務隊列的其他任務 , 即 runAllTasks
-
每個WorkerNIOEventLoop 處理業務時,會使用pipeline(管道),pipeline 中包含了 channel, 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的 處理器
2、應用實例1-服務器客戶端通信
Netty 服務器在 6666 端口監聽,客戶端能發送消息給服務器 “hello, 服務器”。服務器可以回復消息給客戶端 “hello, 客戶端”
注意運行前需要使用Maven導入Netty的包或者依賴
服務器端代碼:
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//創建 bossGroup 和 workerGroup
//bossGroup 只處理連接請求,workerGroup 處理客戶端業務
//兩個都是無限循環
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創建一個服務器端啟動對象,并設置參數
ServerBootstrap serverBootstrap = new ServerBootstrap();
//使用鏈式編程來設置啟動器
serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
.channel(NioServerSocketChannel.class)//設置使用NioServerSocketChannel作為服務器的通道實現
.option(ChannelOption.SO_BACKLOG, 128) //設置線程隊列可以得到的連接個數
.childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態
.childHandler(new ChannelInitializer<SocketChannel>() { //創建一個管道測試對象
//給pipeline設置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//在管道中加入自定義的處理器(NettyServerHandler)
ch.pipeline().addLast(new NettyServerHandler());
}
}); //給workerGroup 的 EventLoop 對應的管道設置處理器
System.out.println("服務器 is ready");
//綁定端口并同步,生成一個 ChannelFuture對象
//啟動服務器
ChannelFuture cf = serverBootstrap.bind(6666).sync();
//對關閉通道進行監聽
cf.channel().closeFuture().sync();
} finally {
//關閉線程組
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler:
/**
* 該類用于實際處理數據,需要繼承Netty規定好的HandlerAdapter
*
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 用于讀取(接收)數據
* @param ctx 上下文對象,其中包含了管道pipeline,通道channel,地址等信息
* @param msg 客戶端發送的信息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = " + ctx);
//將msg轉為一個ByteBuf對象
//ByteBuf是 Netty提供的,與NIO的ByteBuffer不同
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發送的信息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址為:" + ctx.channel().remoteAddress());
}
/**
* 定義數據讀取完畢后的操作
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush作用的是將數據寫到緩沖區并發送
//需要對發送的數據進行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8));
}
/**
* 處理異常,一般是關閉通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端代碼:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//創建事件組
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//創建客戶端啟動對象
//注意客戶端是Bootstrap,不是ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關的參數
bootstrap.group(eventLoopGroup) //設置線程組
.channel(NioSocketChannel.class) //設置客戶端通道的實現類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客戶端 is ok");
//啟動客戶端連接服務器端,異步
ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1", 6666)).sync();
//對通道關閉進行監聽
sync.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
NettyClientHandler:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//當通道就緒就會觸發
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx : " + ctx);
//向服務器端發送消息
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器", CharsetUtil.UTF_8));
}
//接收服務端發送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服務器回復:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務器地址:" + ctx.channel().remoteAddress());
}
//處理異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
運行結果:
服務端輸出:
服務器 is ready
server ctx = ChannelHandlerContext(NettyServerHandler#0, [id: 0x67909ee8, L:/127.0.0.1:6666 - R:/127.0.0.1:3991])
客戶端發送的信息是:hello, 服務器
客戶端地址為:/127.0.0.1:3991
客戶端輸出:
客戶端 is ok
client ctx : ChannelHandlerContext(NettyClientHandler#0, [id: 0xabaaab32, L:/127.0.0.1:3991 - R:/127.0.0.1:6666])
服務器回復:hello,客戶端
服務器地址:/127.0.0.1:6666
注意:
對于服務器端的 bossGroup 和 workerGroup 分別對應模型中的兩個NioEventLoopGroup。其中每個NioEventLoopGroup下面NioEventLoop的數目(線程數目)默認為CPU的核數 * 2
。如果需要設定,可以直接在構造函數傳入,如代碼第一行
//EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
CPU的核數可以通過下面的代碼查看,我的是4核,而創建的NioEventLoop就是8個
System.out.println(NettyRuntime.availableProcessors());
可以在debug中看到從看到,如下圖。同時也可以看出多個NioEventLoop是使用EventExecutor管理的。
任務隊列中的 Task 有 3 種典型使用場景
- 用戶程序自定義的普通任務
- 用戶自定義定時任務
- 非當前 Reactor 線程調用 Channel 的各種方法
示例:
/**
* 該類用于實際處理數據,需要繼承Netty規定好的HandlerAdapter
*
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 用于讀取(接收)數據
* @param ctx 上下文對象,其中包含了管道pipeline,通道channel,地址等信息
* @param msg 客戶端發送的信息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//比如這里我們有一個非常耗時長的業務-> 異步執行
// -> 提交該 channel 對應的 NIOEventLoop 的 taskQueue 中,
ctx.channel().eventLoop().execute(()->{
try {
Thread.sleep(5000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-1", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//這個任務和上面的任務放到同一個eventLoop的taskQueue
//所以這兩個任務是串行執行的
ctx.channel().eventLoop().execute(()->{
try {
Thread.sleep(10000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-2", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//解決方案 2: 用戶自定義定時任務 -》 該任務是提交到 scheduledTaskQueue 中
ctx.channel().eventLoop().schedule(()->{
try {
Thread.sleep(10000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端-2", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 5, TimeUnit.SECONDS);
}
}
3、應用實例2-實現HTTP 服務
Netty 服務器在 8888 端口監聽,瀏覽器發出請求 http://localhost:8888/
服務器可以回復消息給客戶端 “Hello,客戶端”, 并對特定請求資源進行過濾.
實例代碼:
NettyHttpServer(Http服務器)
public class NettyHttpServer {
public static void main(String[] args) throws InterruptedException {
//創建 bossGroup 和 workerGroup
//bossGroup 只處理連接請求,workerGroup 處理客戶端業務
//兩個都是無限循環
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創建一個服務器端啟動對象,并設置參數
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyHttpServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
System.out.println("服務端 is ready");
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyHttpServerInitializer(Channel初始化器),可以向ChannelPipeline加入handler
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {
//向管道加入處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//加入netty提供的httpServerCodec =》(code + decode)
//HttpServerCodec 是netty提供的http編碼-解碼器
pipeline.addLast(new HttpServerCodec());
//添加自定義handler
pipeline.addLast(new NettyHttpServerHandler());
}
}
NettyHttpServerHandler(用于處理Http請求)
/**
* 說明:
* SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的子類
* HttpObject :客戶端和服務器端通信的數據被封裝成 HttpObject
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//讀取客戶端發來的數據
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
//判斷是否是HttpRequest
if (msg instanceof HttpRequest) {
//對圖標資源獲取進行過濾
HttpRequest request = (HttpRequest) msg;
URI uri = new URI(request.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("請求了 favicon.ico ,不做響應");
return;
}
System.out.println("pipeline = " + ctx.pipeline().hashCode() + " handlder = " + this.hashCode());
System.out.println("msg 類型:" + msg.getClass());
System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
//回復信息給瀏覽器(Http協議)
ByteBuf content = Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8);
//構造http響應,即httpResponse
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//返回httpResponse
ctx.writeAndFlush(response);
}
}
}
運行服務器之后,在瀏覽器內輸入地址,可以看到下面的結果:
服務器端控制臺輸出:
服務端 is ready
pipeline = 1328644476 handlder = 5644135
msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
客戶端地址:/0:0:0:0:0:0:0:1:8247
請求了 favicon.ico ,不做響應
其中最后第5行的輸出是瀏覽器獲取圖標的請求,被攔截。對應NettyHttpServerHandler類中第12~22行代碼所對應的功能
另外http是連接不是長連接,每次發送請求都會重新建立連接,而且在Netty中每次處理都會生成pipeline和handlder,下面再次刷新可以看出:
服務端 is ready
pipeline = 1328644476 handlder = 5644135
msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
客戶端地址:/0:0:0:0:0:0:0:1:8247
請求了 favicon.ico ,不做響應
pipeline = 99933460 handlder = 1089885134
msg 類型:class io.netty.handler.codec.http.DefaultHttpRequest
客戶端地址:/0:0:0:0:0:0:0:1:8255
請求了 favicon.ico ,不做響應
4、模型方案再說明
Netty 抽象出兩組線程池,BossGroup 專門負責接收客戶端連接,WorkerGroup 專門負責網絡讀寫操作。
NioEventLoop 表示一個不斷循環執行處理任務的線程,每個 NioEventLoop 都有一個 selector,用于監聽綁定 在其上的 socket 網絡通道。
NioEventLoop 內部采用串行化設計(taskQueue),從消息的讀取->解碼->處理->編碼->發送,始終由 IO 線程 NioEventLoop負責
其中:
- NioEventLoopGroup 下包含多個 NioEventLoop
- 每個 NioEventLoop 中包含有一個 Selector,一個 taskQueue
- 每個 NioEventLoop 的 Selector 上可以注冊監聽多個 NioChannel
- 每個 NioChannel 只會綁定在唯一的 NioEventLoop 上
- 每個 NioChannel 都綁定有一個自己的 ChannelPipeline
四、異步模型
1、基本介紹
-
異步的概念和同步相對。當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的組件在
完成后,通過狀態、通知和回調來通知調用者。 -
Netty 中的 I/O 操作是異步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture。
-
調用者并不能立刻獲得結果,而是通過 Future-Listener 機制,用戶可以方便的主動獲取或者通過通知機制獲得 IO 操作結果
-
Netty 的異步模型是建立在 future 和 callback 的之上的。callback 就是回調。重點說 Future,它的核心思想 是:假設一個方法 fun,計算過程可能非常耗時,等待 fun 返回顯然不合適。那么可以在調用 fun 的時候,立 馬返回一個 Future,后續可以通過 Future 去監控方法 fun 的處理過程(即 : Future-Listener 機制)
2、Future -Listener 機制
- Future 表示異步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等等.
- ChannelFuture 是一個接口 :
public interface ChannelFuture extends Future<Void>
我們可以添加監聽器,當監聽的事件發生時,就會通知到監聽器 - 當 Future 對象剛剛創建時,處于非完成狀態,調用者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,注冊監聽函數來執行完成后的操作。
常見有如下操作:
- 通過 isDone 方法來判斷當前操作是否完成;
- 通過 isSuccess 方法來判斷已完成的當前操作是否成功;
- 通過 getCause 方法來獲取已完成的當前操作失敗的原因
- 通過 isCancelled 方法來判斷已完成的當前操作是否被取消;
- 通過 addListener 方法來注冊監聽器,如當操作已完成(isDone 方法返回完成),將會通知指定的監聽器;如果 Future 對象已完成,則通知指定的監聽器
3、應用示例
舉例說明:綁定端口是異步操作,當綁定操作處理完,將會調用相應的監聽器處理邏輯
//綁定端口并同步,生成一個 ChannelFuture對象
//啟動服務器
ChannelFuture cf = serverBootstrap.bind(6666).sync();
//為ChannelFuture注冊監聽器,監聽關心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("服務器綁定 6666 端口成功");
} else {
System.out.println("服務器綁定 6666 端口失敗");
}
}
});
五、Netty 核心模塊組件
1、 Bootstrap、ServerBootstrap
Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程序,串聯 各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導類,ServerBootstrap 是服務端啟動引導類
常見的方法有 :
//該方法用于服務器端,用來設置兩個 EventLoop
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup);
//該方法用于客戶端,用來設置一個 EventLoop
public B group(EventLoopGroup group);
//該方法用來設置一個服務器端的通道實現
public B channel(Class<? extends C> channelClass);
//用來給 ServerChannel 添加配置
public <T> B option(ChannelOption<T> option, T value);
//用來給接收到的通道添加配置
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value);
//該方法用來設置業務處理類(自定義的 handler)
public ServerBootstrap childHandler(ChannelHandler childHandler);
//該方法用于服務器端,用來設置占用的端口號
public ChannelFuture bind(int inetPort);
//該方法用于客戶端,用來連接服務器端
public ChannelFuture connect(String inetHost, int inetPort)
2、Future、ChannelFuture
Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過一會等它執行完成或 者直接注冊一個監聽
具體的實現就是通過 Future 和 ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件
常見的方法有 :
Channel channel();//返回當前正在進行 IO 操作的通道
ChannelFuture sync();//等待異步操作執行完畢
3、Channel
-
Channel 是Netty 網絡通信的組件,能夠用于執行網絡 I/O 操作。
-
通過 Channel 可獲得當前網絡連接的通道的狀態
-
通過 Channel 可獲得 網絡連接的配置參數 (例如接收緩沖區大小)
-
Channel 提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定端口),異步調用意味著任何 I/O 調用都將立即返 回,并且不保證在調用結束時所請求的 I/O 操作已完成
-
調用立即返回一個 ChannelFuture 實例,通過注冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回調通知調用方
-
支持關聯 I/O 操作與對應的處理程序
-
不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應,常用的 Channel 類型:
- NioSocketChannel,異步的客戶端 TCPSocket 連接。
- NioServerSocketChannel,異步的服務器端 TCPSocket 連接。
- NioDatagramChannel,異步的 UDP 連接。
- NioSctpChannel,異步的客戶端 Sctp 連接。
- NioSctpServerChannel,異步的 Sctp 服務器端連接,這些通道涵蓋了 UDP 和 TCP 網絡 IO 以及文件 IO
4、Selector
Netty 基于 Selector 對象實現 I/O 多路復用,通過 Selector 一個線程可以監聽多個連接的 Channel 事件。
當向一個 Selector 中注冊 Channel 后,Selector 內部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網絡連接完成等),這樣程序就可以很簡單地使用一個 線程高效地管理多個 Channel
5、ChannelHandler 及其實現類
ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操作,并將其轉發到其 ChannelPipeline(業務處理鏈)
中的下一個處理程序。
ChannelHandler 本身并沒有提供很多方法,因為這個接口有許多的方法需要實現,方便使用期間,可以繼承它
的子類
ChannelHandler 及其實現類一覽圖
- ChannelInboundHandler 用于處理入站 I/O 事件。
- ChannelOutboundHandler 用于處理出站 I/O 操作。
- ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。
- ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。
- ChannelDuplexHandler 用于處理入站和出站事件。
經常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter,然后通過重寫相應方法實現業務
邏輯,我們接下來看看一般都需要重寫哪些方法
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
//通道就緒事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
//通道讀取數據事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//通道讀取數據完畢事件
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
//通道發生異常事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
6、Pipeline 和 ChannelPipeline
ChannelPipeline 是一個 Handler 的集合,它負責處理和攔截 inbound 或者 outbound 的事件和操作,相當于 一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作)
ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互
在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下
- 一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext (就是ctx)組成的雙向鏈表,并且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler
- 入站事件和出站事件在一個雙向鏈表中,入站事件會從鏈表 head 往后傳遞到最后一個入站的 handler,出站事件會從鏈表 tail 往前傳遞到最前一個出站的 handler,兩種類型的 handler 互不干擾
常用方法:
ChannelPipeline addFirst(ChannelHandler... handlers)
:把一個業務處理類(handler)添加到鏈中的第一個位置ChannelPipeline addLast(ChannelHandler... handlers)
:把一個業務處理類(handler)添加到鏈中的最后一個位置
7、ChannelHandlerContext
保存 Channel 相關的所有上下文信息,同時關聯一個 ChannelHandler 對象
ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler , 同 時ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的信息,方便對 ChannelHandler進行調用.
常用方法:
ChannelFuture close()
:關閉通道ChannelOutboundInvoker flush()
:刷新ChannelFuture writeAndFlush(Object msg)
: 將 數據寫到 ChannelPipeline 中當前ChannelHandler 的下一個 ChannelHandler 開始處理(出站)
8、ChannelOption
Netty 在創建 Channel 實例后,一般都需要設置 ChannelOption 參數。
ChannelOption 參數如下:
- ChannelOption.SO_BACKLOG:對應 TCP/IP 協議 listen 函數中的 backlog 參數,用來初始化服務器可連接隊列大小。服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接。多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog 參數指定了隊列的大小。
- ChannelOption.SO_KEEPALIVE:設置一直保持連接活動狀態
9、EventLoopGroup 和其實現類 NioEventLoopGroup
EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護著一個 Selector 實例。
EventLoopGroup 提供 next 接口,可以從組里面按照一定規則獲取其中一個 EventLoop來處理任務。
在 Netty 服務器端編程中,我們一般都需要提供兩個 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一個服務端口即一個 ServerSocketChannel對應一個Selector 和一個EventLoop線程。BossEventLoop 負責接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理,如下圖所示
- BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護著一個注冊了ServerSocketChannel 的 Selector 實例。BossEventLoop 不斷輪詢, Selector 將連接事件分離出來
- 連接完成后,將接收到的 SocketChannel 交給 WorkerEventLoopGroup
- WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop來將這個 SocketChannel 注冊到其維護的 Selector 并對其后續的 IO 事件進行處理
常用方法:
public NioEventLoopGroup()
:構造方法public Future<?> shutdownGracefully()
:斷開連接,關閉線程
10、Unpooled 類
Unpooled 類是Netty 提供一個專門用來操作緩沖區(即Netty的數據容器,如ByteBuf)的工具類
常用方法:
////通過給定的數據和字符編碼返回一個 ByteBuf 對象(類似于 NIO 中的 ByteBuffer 但有區別)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset);
//使用
ByteBuf content = Unpooled.copiedBuffer("hello,客戶端", CharsetUtil.UTF_8);
使用示例:
public class NettyByteBuf01 {
public static void main(String[] args) {
//創建一個ByteBuf
//說明
//1. 創建 對象,該對象包含一個數組arr , 是一個byte[10]
//2. 在netty 的buffer中,不需要使用flip 進行反轉
// 底層維護了 readerindex 和 writerIndex
//3. 通過 readerindex 和 writerIndex 和 capacity, 將buffer分成三個區域
// 0---readerindex 已經讀取的區域
// readerindex---writerIndex , 可讀的區域
// writerIndex -- capacity, 可寫的區域
ByteBuf buffer = Unpooled.buffer(10);
for(int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("capacity=" + buffer.capacity());//10
//輸出
// for(int i = 0; i<buffer.capacity(); i++) {
// System.out.println(buffer.getByte(i));
// }
for(int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}
System.out.println("執行完畢");
}
}
public class NettyByteBuf02 {
public static void main(String[] args) {
//創建ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
//使用相關的方法
if(byteBuf.hasArray()) { // true
byte[] content = byteBuf.array();
//將 content 轉成字符串
System.out.println(new String(content, Charset.forName("utf-8")));
System.out.println("byteBuf=" + byteBuf);
System.out.println(byteBuf.arrayOffset()); // 0
System.out.println(byteBuf.readerIndex()); // 0
System.out.println(byteBuf.writerIndex()); // 12
System.out.println(byteBuf.capacity()); // 36
//System.out.println(byteBuf.readByte()); //
System.out.println(byteBuf.getByte(0)); // 104
int len = byteBuf.readableBytes(); //可讀的字節數 12
System.out.println("len=" + len);
//使用for取出各個字節
for(int i = 0; i < len; i++) {
System.out.println((char) byteBuf.getByte(i));
}
//按照某個范圍讀取
System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
}
}
}
智能推薦
Netty學習筆記(三)--- 線程模型和EventLoopGroup
Netty之所以成為高性能NIO框架,其精心設計的高效線程模型功不可沒,Netty線程模型基于了一個著名的模式——Reactor模式。 Reactor模型 Reactor模型是一種經典的線程模型,也叫反應器模型,網上已經由很多對它的介紹,這里就不過多的介紹,只是簡單介紹三種模型的特點。 單線程模型 Reactor單線程模型僅使用一個線程來處理所有的事情,包括服務端和客戶端的...
Netty學習——Reactor線程模型
一 netty概述 1.1 為什么有了NIO還有Netty NIO的缺點 JDK 的 NIO 底層由 epoll 實現,該實現飽受詬病的Selector 空輪詢 bug 會導致 cpu 飆升 100% NIO的API繁雜,使用麻煩,必須熟練掌握Selector、Channel、Buffer等相關API。并且需要熟練Java多線程編程和網絡編程,才能寫出高質量的NIO代碼。 開發工作量和難度都非常大...
freemarker + ItextRender 根據模板生成PDF文件
1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...
猜你喜歡
requests實現全自動PPT模板
http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...
Linux C系統編程-線程互斥鎖(四)
互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...
統計學習方法 - 樸素貝葉斯
引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...
styled-components —— React 中的 CSS 最佳實踐
https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...