Netty拾遺(三)——Reactor設計模式
前言
上一篇博客總結了NIO的三個關鍵組件——NIO的三個組件,但是其中并沒有提到SelectionKey,在reactor設計模式中,SelectionKey起到了關鍵性的作用,這篇博客會說明一下SelectionKey,同時在第一遍Netty系列博客中我們簡單介紹過三種IO模式,并通過簡單實例代碼進行了說明:BIO,NIO與AIO簡介。這篇博客在前兩篇博客的基礎上繼續總結reactor設計模式。依舊會參考《Netty、Redis、Zookeeper高并發實戰》和《Netty權威指南》兩本書。
說一下SelectionKey
簡單來說可以理解為SelectionKey為被Selector(選擇器)選中的注冊事件。在上一篇博客中,一個IO事件發生后,如果之前在選擇器中注冊過,就會被選擇器選中。而這里的SelectionKey就可以簡單理解為被選中了的IO事件。在實際代碼中,可以通過SelectionKey獲得通道的IO事件類型,還可以獲得發生IO事件所在的通道,此外還可以獲得選擇鍵對應的選擇器實例。
主要存在四個事件類型,如下所示
事件名稱 | 枚舉值 |
---|---|
可讀 | SelectionKey.OP_READ |
可寫 | SelectionKey.OP_WRITE |
連接 | SelectionKey.CONNECT |
接受 | SelectionKey.OP_ACCEPT |
SelectionKey其中有兩個重要的方法:attach與attachment方法
void attach(Object o)
這個方法可以將任何的對象作為附件綁定的到SelectionKey實例上,前面說過SelectionKey表示的是具體的IO事件,這里的attach就是給對應的IO事件綁定處理器。
void attachment()
這個方法是取出SelectionKey對應的事件處理器。
reactor模式簡介
reactor模式其實說到底就是一個設計模式,該設計模式其實是在文章 《Scalable IO in Java》中的描述,網上大部分文章關于reactor模式的示例圖,均是來自于這篇文章,這篇博客依舊會采用該示意圖(因為我實在想不出更好的圖來描述reactor模式)。總體來說Reactor模式由Reactor處理線程與Handler處理器兩大角色來組成。
Reactor處理線程負責響應各種IO事件,并將事件分發到Handlers處理器處理。
Handlers處理器的職責,就是非阻塞的執行業務處理邏輯。
單線程的reactor模式
《Netty、Redis、Zookeeper高并發實戰》一書中的Reactor模式的實例有些混亂,這里簡單梳理一下。
這里補充一點,在前一篇博客中提到NIO的兩個channel——ServerSocketChannel和SocketChannel,兩者關注的事件類型也不同。
ServerSocketChannel只需要關注OP_ACCEPT
連接建立事件,并且ServerSocketChannel通過accept獲取SocketChannel與客戶端連接的通道。各個通道關注的事件列表具體如下。
通道類型 | OP_READ | OP_WRITE | OP_CONNECT | OP_ACCEPT |
---|---|---|---|---|
服務端ServerSocketChannel | Y | |||
服務端SocketChannel | Y | Y | ||
客戶端SocketChannel | Y | Y | Y |
由于服務端會存在ServerSocketChannel和SocketChannel ,因此需要對兩個通道的不同事件使用不同的事件處理器。
單線程的reactor模式,其實就是只有一個reactor線程,只有一個線程用于響應IO事件并分發事件。這里直接進入代碼實例
1、簡單定義一個Handler處理的接口
《Netty、Redis、Zookeeper高并發實戰》一書中直接實現的是runnable接口,導致業務和線程的處理混合在一起
/**
* autor:liman
* createtime:2020/9/9
* comment:Handler處理器接口
*/
public interface IHandler {
public void handlerSelectKey();
}
2、連接事件的處理Handler
/**
* autor:liman
* createtime:2020/9/9
* comment: Acceptor的處理類 處理連接的建立事件
*/
@Slf4j
public class AcceptorHandler implements IHandler{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public AcceptorHandler(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
//這里完成事件的分發
@Override
public void handlerSelectKey() {
try{
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel!=null){
new KeyEventHandler(selector,socketChannel);
}
}catch (Exception e){
log.error("事件分發異常,異常信息為:{}",e);
}
}
}
3、讀寫事件的處理Handler
/**
* autor:liman
* createtime:2020/9/10
* comment: 可讀,可寫的事件處理Handler
*/
@Slf4j
public class KeyEventHandler implements IHandler {
private Selector selector;
private SocketChannel socketChannel;
private SelectionKey selectionKey;
/**
* 這里需要綁定非連接事件的處理器
* @param selector
* @param socketChannel
*/
public KeyEventHandler(Selector selector, SocketChannel socketChannel) {
try {
this.selector = selector;
this.socketChannel = socketChannel;
this.socketChannel.configureBlocking(false);
selectionKey = this.socketChannel.register(this.selector, 0);//這里沒有注冊任何事件到selector上,只是獲取指定的選擇鍵
selectionKey.attach(this);
//通過selectionKey注冊可讀事件
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void handlerSelectKey() {
try {
String currentTime = "";
if(selectionKey.isValid()) {
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
log.info("the time server receive order:{}", body);
}else if(readBytes<0){//如果讀取不到數據,則表示SocketChannel關閉,直接返回
socketChannel.close();
return;
}
//讀取事件處理完成之后,需要再一次注冊可寫事件
selectionKey.interestOps(SelectionKey.OP_WRITE);
} else if (selectionKey.isWritable()) {
currentTime = LocalDateTime.now().toString();
log.info("send message 2 client ,current time is : {}",currentTime);
doWrite(socketChannel, currentTime);
//可讀事件處理完成之后,需要再一次注冊可讀事件
selectionKey.interestOps(SelectionKey.OP_READ);
}
selectionKey.attach(this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 通過緩沖區,往channel中寫入數據
* @param socketChannel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
}
4、分發事件的線程(這個我理解為就是Reactor線程)
/**
* autor:liman
* createtime:2020/9/10
* comment:分發事件處理任務的線程
*/
@Slf4j
public class DispatchThread implements Runnable {
private Selector selector;
public DispatchThread(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(1000);
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//Reactor負責dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
} catch (Exception ex) {
log.error("事件分配失敗,異常信息為:{}", ex);
}
}
//不同的IO事件處理類分發
private void dispatch(SelectionKey selectionKey){
IHandler handler = (IHandler) selectionKey.attachment();
//調用之前attach綁定到選擇鍵的handler處理器對象
if (handler != null) {
handler.handlerSelectKey();
}
}
}
5、主要啟動類
/**
* autor:liman
* createtime:2020/8/12
* comment:單線程reactor 模式的客戶端
*/
@Slf4j
public class TimeReactorServer {
private static final String host = "127.0.0.1";
private static final int port = 9898;
public static void main(String[] args) {
//啟動的時候,將服務端的ServerSocketChannel注冊到Selector上,并綁定連接事件的處理方式
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress inetAddress =
new InetSocketAddress(host,
port);
serverSocketChannel.bind(inetAddress);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//綁定連接事件的處理Handler
selectionKey.attach(new AcceptorHandler(selector,serverSocketChannel));
//啟動Reactor線程
new Thread(new DispatchThread(selector)).start();
} catch (Exception e) {
log.error("連接建立出現異常,異常信息為:{}",e);
}
}
}
客戶端的實現不需要改變,上一篇博客的TimeClient就可以用于測試單線程的reactor代碼。
這個時候就可以拿出那個經典的圖片來說道說道了
我們可以看到,DispatchThread其實就是reactor線程,負責各種IO事件處理的分發,AcceptorHandler我個人理解就是acceptor,只是負責連接事件建立的處理。
所謂單線程的Reactor模式與多線程Reactor模式的區別最直觀的表現就在于DispatchThread的個數和工作的selector選擇器的個數都不止一個。
整體而言我們的實現相比圖中有些出入,因為我們的分發操作,并不是在acceptor中處理,但是這樣可以較好的理解reactor的幾個要素。
多線程的reactor模式
前面提到過,相比單線程的reactor模式而言,最大的區別在于不止一個選擇器,不止一個線程進行selector注冊事件的掃描與分配。同時處理讀寫請求的線程相比單線程模式也不止一個
1、統一一個處理類接口,為了區別與單線程的,這里重新給定一個名稱
/**
* autor:liman
* createtime:2020/9/9
* comment:統一一個處理器接口
*/
public interface IMultiHandler{
public void handlerSelectKey();
}
2、普通IO事件的處理Handler
/**
* autor:liman
* createtime:2020/9/10
* comment:普通IO讀寫事件的處理器,這里繼承了Thread類,實現了IMultiHandler接口
*/
@Slf4j
public class MultiKeyEventHandler extends Thread implements IMultiHandler {
private Selector selector;
private SocketChannel socketChannel;
private SelectionKey selectionKey;
/**
* 這里需要綁定非連接事件的處理器
*
* @param selector
* @param socketChannel
*/
public MultiKeyEventHandler(Selector selector, SocketChannel socketChannel) {
try {
this.selector = selector;
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false);
//這里沒有注冊任何事件到selector上,只是獲取指定的選擇鍵
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
//通過selectionKey注冊可讀事件
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void handlerSelectKey() {
try {
String currentTime = "";
if (selectionKey.isValid()) {
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
log.info("the time server receive order:{}", body);
} else if (readBytes < 0) {//如果讀取不到數據,則表示SocketChannel關閉,直接返回
socketChannel.close();
return;
}
selectionKey.interestOps(SelectionKey.OP_WRITE);
} else if (selectionKey.isWritable()) {
currentTime = LocalDateTime.now().toString();
log.info("send message 2 client ,current time is : {}", currentTime);
doWrite(socketChannel, currentTime);
selectionKey.interestOps(SelectionKey.OP_READ);
}
selector.wakeup();
}
} catch (Exception ex) {
selectionKey.cancel();
return;
}
}
/**
* 通過緩沖區,往channel中寫入數據
*
* @param socketChannel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
@Override
public void run() {
handlerSelectKey();
}
}
3、連接建立事件的處理器
/**
* autor:liman
* createtime:2020/9/9
* comment: 多線程的Acceptor的處理器
*/
@Slf4j
public class MultiAcceptorHandler implements IMultiHandler {
private Selector[] selectors;
private ServerSocketChannel serverSocketChannel;
private int selectorCount;
private AtomicInteger selectorIndex = new AtomicInteger(0);
private AtomicInteger totalConnectCount = new AtomicInteger(0);
static ExecutorService pool = Executors.newFixedThreadPool(4);
public MultiAcceptorHandler(Selector[] selectors, ServerSocketChannel serverSocketChannel,int selectorCount) {
this.selectors = selectors;
this.serverSocketChannel = serverSocketChannel;
this.selectorCount = selectorCount;
}
@Override
public void handlerSelectKey() {
//selector的索引處理,如果已經達到了上線,歸零。
//這里也可以用隨機的方式,隨機獲取一個選擇器都可以
if(selectorIndex.incrementAndGet() == selectorCount){
selectorIndex.set(0);
}
try{
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel!=null){
log.info("總共建立的連接事件個數:{}",totalConnectCount.incrementAndGet());
//這里將IO讀寫處理的Handler交給線程池去處理
MultiKeyEventHandler eventHandler = new MultiKeyEventHandler(this.selectors[selectorIndex.get()], socketChannel);
pool.execute(eventHandler);
}
}catch (Exception e){
log.error("事件分發異常,異常信息為:{}",e);
}
}
}
4、服務端主要入口代碼,與單線程相比,主要在于啟動事件掃描線程
/**
* autor:liman
* createtime:2020/8/12
* comment:單線程reactor 模式的客戶端
*/
@Slf4j
public class TimeMulitReactorServer {
private static final String host = "127.0.0.1";
private static final int port = 10229;
private static final int selectorCount = 2;
public static void main(String[] args) {
//啟動的時候,將服務端的ServerSocketChannel注冊到Selector上,并綁定連接事件的處理方式
try {
Selector[] selectors = new Selector[selectorCount];
for(int i = 0;i<selectors.length;i++){
selectors[i]=Selector.open();
}
//開啟Reactor線程,可以先開啟子線程,反正這個時候也沒有注冊事件需要分配,讓其空載就行
//后面的連接事件綁定到指定的處理器上,相應的線程會自動輪詢處理
for(int i = 0;i<selectors.length;i++) {
new Thread(new MultiDispatchThread(selectors[i]),"dispatchThread_"+i).start();
}
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress inetAddress =
new InetSocketAddress(host,
port);
serverSocketChannel.bind(inetAddress);
//這里隨便注冊一個selector都可以
int index = new Random().nextInt(selectorCount);
SelectionKey selectionKey = serverSocketChannel.register(selectors[index], SelectionKey.OP_ACCEPT);
//綁定連接事件的處理Handler
selectionKey.attach( new MultiAcceptorHandler(selectors, serverSocketChannel, selectorCount));
} catch (Exception e) {
log.error("連接建立出現異常,異常信息為:{}",e);
}
}
}
其他客戶端的改造
為了更好的測試,客戶端需要啟動多個線程,最好引入線程池,具體改造如下
/**
* autor:liman
* createtime:2020/8/12
* comment:客戶端代碼,其中TimeClientHandler不用變,只是開啟線程的代碼引入線程池
*/
@Slf4j
public class TimeMultiReactorClient {
static ExecutorService pool = Executors.newFixedThreadPool(4);
public static void main(String[] args) throws InterruptedException {
int port = 10229;
for(int i=0;i<100;i++) {
Thread thread = new Thread(new TimeClientHandler("127.0.0.1", port), "TimeClientThread-" + i);
pool.execute(thread);
}
}
}
代碼均能正常運行,這里還是按照俗套寫法,附上一張多線程reactor的圖片,應該都能輕松找到對應的組件
總結
在調試過程中,客戶端代碼沒有改造成線程池之前,直接開啟了100個線程,結果調試過程發現服務端只接收到了五十多個線程就阻塞,以為是服務端的問題,后來發現客戶端只發送了五十多個請求就阻塞,后來調整了線程棧的JVM參數,才發現問題,后來直接使用線程池,客戶端才能正常發送100個請求。
由于關于多線程線程池等問題,我不是很熟悉,因此耗費較久,整體多線程Reactor模式,幾乎在結合了《Netty、Redis、Zookeeper高并發實戰》和《Netty權威指南》兩本書之后,綜合整出了這個實例,《Netty、Redis、Zookeeper高并發實戰》給出過一個完整的實例,但個人覺得其實例部分普通處理的Handler與線程的run方法雜糅在一塊,理解起來較為晦澀,結合《Netty權威指南》的查詢時間的實例,綜合弄出了上述實例,只需要明確一點,連接事件建立的處理與普通IO的處理會交給不同的處理器進行處理。這兩種事件的處理都可以注冊在Selector上,在單線程模式下兩種事件的處理是注冊在同一個selector上,而多線程模式下是隨機的。
智能推薦
python拾遺(二)
21、python list 列表中可以用的swap方法 mylist[i], mylist[j] = mylist[j], mylist[i] 這樣就可以swap兩個元素了 還有 a ^= b b ^= a a ^= b 22、python取整及保留小數 23、Python拷貝(深拷貝deepcopy與淺拷貝copy) Python中的對象之間賦值時是按引用傳遞的,如果需要拷貝對象,需要使用標準...
Python拾遺(一)
python扔在一邊很久了,最近剛好有幾天閑,買了本《Head First Python》復習一下。 跟python結緣是因為教C語言的王青老師推薦我們邊學C邊學python,記得說是C結合腳本語言會發揮出比較大的威力,還推薦了一門Coursera上的公開課,名字好像叫how to program,當時只是聽說,因為當時各種狀態,并沒有學。 后來用一個暑假學《learn python the ha...
C語言拾遺
short, long short 和 long 用于修飾整型,和 int 一起使用,但使用時 int 關鍵字可以省略。 short, int, long 在不同的機器架構下占用的長度不同,但一般遵循以下限制:short長度小于等于int,int長度小于等于long,一般short長度至少需要16位,long長度至少需要32位。 extern 引用全局變量(外部變量)時需要先使用 extern 關...
【Practical】Dropout拾遺
文章目錄 早期:Vanilla-Dropout. 現代:Inverted-Dropout. 模型集成角度理解. Dropout & L2-norm. Dropout & GD. 論文資源. 早期:Vanilla-Dropout. 【Dropout思想提出者Hinton在2014年論文中敘述的版本】 訓練過程中獨立地以概率 p p p 保留下每一個神經元,注意這里的 p p p 代表...
猜你喜歡
freemarker + ItextRender 根據模板生成PDF文件
1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...
requests實現全自動PPT模板
http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...
Linux C系統編程-線程互斥鎖(四)
互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...