精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

從實戰開始,帶你深入了解Netty各個組件和ByteBuf

網絡 通信技術
本篇文章想來從實戰開始,帶我深入了解Netty各個組件是做什么?ByteBuf執行原理又是怎樣的?

 [[358902]]

上文對IO模型和Reactor模型進行講解,是不是感覺有點懵懵的。哈哈哈,反正我并沒有對其有深入見解。我是這樣安慰自己的,知識在不斷的反復學習和思考中有新的感悟。不氣餒,繼續新的征程。本篇文章想來從實戰開始,帶我深入了解Netty各個組件是做什么?ByteBuf執行原理又是怎樣的?

01一 第一個Netty實例

用Netty實現通信。說白了就是客戶端向服務端發消息,服務端接收消息并給客戶端響應。所以我來看看服務端和客戶端是如何實現的?

11.1 服務端

1. 依賴

  1. <?xml version="1.0" encoding="UTF-8"?> 
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" 
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
  5.     <modelVersion>4.0.0</modelVersion> 
  6.  
  7.     <groupId>com.haopt.iot</groupId> 
  8.     <artifactId>first-netty</artifactId> 
  9.     <packaging>jar</packaging> 
  10.     <version>1.0-SNAPSHOT</version> 
  11.  
  12.     <dependencies> 
  13.         <dependency> 
  14.             <groupId>io.netty</groupId> 
  15.             <artifactId>netty-all</artifactId> 
  16.             <version>4.1.50.Final</version> 
  17.         </dependency> 
  18.  
  19.         <dependency> 
  20.             <groupId>junit</groupId> 
  21.             <artifactId>junit</artifactId> 
  22.             <version>4.12</version> 
  23.         </dependency> 
  24.     </dependencies> 
  25.  
  26.     <build> 
  27.         <plugins> 
  28.              
  29.             <plugin> 
  30.                 <groupId>org.apache.maven.plugins</groupId> 
  31.                 <artifactId>maven-compiler-plugin</artifactId> 
  32.                 <version>3.2</version> 
  33.                 <configuration> 
  34.                     <source>1.8</source> 
  35.                     <target>1.8</target> 
  36.                     <encoding>UTF-8</encoding> 
  37.                 </configuration> 
  38.             </plugin> 
  39.         </plugins> 
  40.     </build> 
  41. </project> 

2. 服務端-MyRPCServer

  1. package com.haopt.netty.server; 
  2.  
  3. import io.netty.bootstrap.ServerBootstrap; 
  4. import io.netty.buffer.UnpooledByteBufAllocator; 
  5. import io.netty.channel.ChannelFuture; 
  6. import io.netty.channel.ChannelOption; 
  7. import io.netty.channel.EventLoopGroup; 
  8. import io.netty.channel.nio.NioEventLoopGroup; 
  9. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  10.  
  11. public class MyRPCServer { 
  12.     public void start(int port) throws Exception { 
  13.         // 主線程,不處理任何業務邏輯,只是接收客戶的連接請求 
  14.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  15.         // 工作線程,線程數默認是:cpu核數*2 
  16.         EventLoopGroup worker = new NioEventLoopGroup(); 
  17.         try { 
  18.             // 服務器啟動類 
  19.             ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  20.             serverBootstrap.group(boss, worker) //設置線程組 
  21.                     .channel(NioServerSocketChannel.class)  //配置server通道 
  22.                     .childHandler(new MyChannelInitializer()); //worker線程的處理器 
  23.             //ByteBuf 的分配要設置為非池化,否則不能切換到堆緩沖區模式 
  24.             serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); 
  25.             ChannelFuture future = serverBootstrap.bind(port).sync(); 
  26.             System.out.println("服務器啟動完成,端口為:" + port); 
  27.             //等待服務端監聽端口關閉 
  28.             future.channel().closeFuture().sync(); 
  29.         } finally { 
  30.             //優雅關閉 
  31.             boss.shutdownGracefully(); 
  32.             worker.shutdownGracefully(); 
  33.         } 
  34.     } 
  35.  

3. 服務端-ChannelHandler

  1. package com.haopt.netty.server.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.ChannelInboundHandlerAdapter; 
  6. import io.netty.util.CharsetUtil; 
  7.  
  8. public class MyChannelHandler extends ChannelInboundHandlerAdapter { 
  9.     /** 
  10.     * 獲取客戶端發來的數據 
  11.     * @param ctx 
  12.     * @param msg 
  13.     * @throws Exception 
  14.     */ 
  15.     @Override 
  16.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  17.         ByteBuf byteBuf = (ByteBuf) msg; 
  18.         String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  19.         System.out.println("客戶端發來數據:" + msgStr); 
  20.         //向客戶端發送數據 
  21.         ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  22.     } 
  23.      
  24.     /** 
  25.     * 異常處理 
  26.     * @param ctx 
  27.     * @param cause 
  28.     * @throws Exception 
  29.     */ 
  30.     @Override 
  31.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  32.         cause.printStackTrace(); 
  33.         ctx.close(); 
  34.     } 

4. 測試用例

  1. package com.haopt.netty.myrpc; 
  2. import com.haopt.netty.server.MyRPCServer; 
  3. import org.junit.Test; 
  4. public class TestServer { 
  5.     @Test 
  6.     public void testServer() throws Exception{ 
  7.         MyRPCServer myRPCServer = new MyRPCServer(); 
  8.         myRPCServer.start(5566); 
  9.     } 

21.2 客戶端

1. 客戶端-client

  1. package com.haopt.netty.client; 
  2. import com.haopt.netty.client.handler.MyClientHandler; 
  3. import io.netty.bootstrap.Bootstrap; 
  4. import io.netty.channel.ChannelFuture; 
  5. import io.netty.channel.EventLoopGroup; 
  6. import io.netty.channel.nio.NioEventLoopGroup; 
  7. import io.netty.channel.socket.nio.NioSocketChannel; 
  8. public class MyRPCClient { 
  9.     public void start(String host, int port) throws Exception { 
  10.         //定義⼯作線程組 
  11.         EventLoopGroup worker = new NioEventLoopGroup(); 
  12.         try { 
  13.             //注意:client使⽤的是Bootstrap 
  14.             Bootstrap bootstrap = new Bootstrap(); 
  15.             bootstrap.group(worker) 
  16.             .channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel 
  17.             .handler(new MyClientHandler()); 
  18.             //連接到遠程服務 
  19.             ChannelFuture future = bootstrap.connect(host, port).sync(); 
  20.             future.channel().closeFuture().sync(); 
  21.         } finally { 
  22.              worker.shutdownGracefully(); 
  23.         } 
  24.     } 

2. 客戶端-(ClientHandler)

  1. package com.haopt.netty.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8.     @Override 
  9.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  10.         System.out.println("接收到服務端的消息:" + 
  11.         msg.toString(CharsetUtil.UTF_8)); 
  12.     } 
  13.     @Override 
  14.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15.         // 向服務端發送數據 
  16.         String msg = "hello"
  17.         ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18.     } 
  19.     @Override 
  20.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  21.         cause.printStackTrace(); 
  22.         ctx.close(); 
  23.     } 

相信代碼執行起來沒有任何問題(如果有任何問題反應交流)。但是對上面代碼為何這樣實現有很多疑。嘿嘿,我也是奧。接下來我們對這些代碼中用到的組件進行介紹,希望能消除之前疑慮。如果還是不能,可以把疑問寫于留言處,嘿嘿,我也不一定會有個好的解答,但是大佬總會有的。

02二 Netty核心組件

我們都知道Netty是基于事件驅動。但是事件發生后,Netty的各個組件都做了什么?來看看下面內容!

3 2.1 Channel

1. 初識Channel

  1. a 可以理解為socket連接,客戶端和服務端連接的時候會創建一個channel。 
  2. 負責基本的IO操作,例如:bind()、connect()、read()、write()。 
  3. b Netty的Channel接口所提供的API,大大減少了Socket類復雜性 

2. 常見Channel(不同的協議和阻塞類型的連接會有不同的Channel類型與之對應)

  1. a NioSocketChannel,NIO的客戶端 TCP Socket 連接。 
  2.  
  3. b NioServerSocketChannel,NIO的服務器端 TCP Socket 連接。 
  4.  
  5. c NioDatagramChannel, UDP 連接。 
  6.  
  7. d NioSctpChannel,客戶端 Sctp 連接。 
  8.  
  9. e NioSctpServerChannel,Sctp 服務器端連接,這些通道涵蓋了UDP和TCP⽹絡IO以及⽂件IO。 

4 2.2 EventLoopGroup、EventLoop

1. 概述

  1. 有了Channel連接服務,連接之間消息流動。服務器發出消息稱為出站,服務器接受消息稱為入站。 
  2. 那么消息出站和入站就產生了事件例如:連接已激活;數據讀取;用戶事件;異常事件;打開連接; 
  3. 關閉連接等等。有了事件,有了事件就需要機制來監控和協調事件,這個機制就是EventLoop。 

2. 初識EventLoopGroup、EventLoop


對上圖解釋

  1. a 一個EventLoopGroup包含一個或者多個EventLoop 
  2. b 一個EventLoop在生命周期內之和一個Thread綁定 
  3. c EventLoop上所有的IO事件在它專有的Thread上被處理。 
  4. d Channel在它生命周期只注冊于一個Event Loop 
  5. e 一個Event Loop可能被分配給一個或者多個Channel 

3. 代碼實現

  1. // 主線程,不處理任何業務邏輯,只是接收客戶的連接請求 
  2. EventLoopGroup boss = new NioEventLoopGroup(1); 
  3. // ⼯作線程,線程數默認是:cpu*2 
  4. EventLoopGroup worker = new NioEventLoopGroup(); 

5 2.3 ChannelHandler

1. 初識ChannelHandler

  1. 對于數據的出站和入棧的業務邏輯都是在ChannelHandler中。 

2. 對于出站和入站對應的ChannelHandler


  1. ChannelInboundHandler ⼊站事件處理器 
  2. ChannelOutBoundHandler 出站事件處理器 

3. 開發中常用的ChannelHandler(ChannelInboundHandlerAdapter、SimpleChannelInboundHandler)

a 源碼

b SimpleChannelInboundHandler的源碼(是ChannelInboundHandlerAdapter子類)


注意:

兩者的區別在于,前者不會釋放消息數據的引⽤,⽽后者會釋放消息數據的引⽤。

6 2.4 ChannelPipeline 

1. 初識ChannelPipeline

  1. 將ChannelHandler串起來。一個Channel包含一個ChannelPipeline,而ChannelPipeline維護者一個ChannelHandler列表。 
  2. ChannelHandler與Channel和ChannelPipeline之間的映射關系,由ChannelHandlerContext進⾏維護。 

 

如上圖解釋

  1. ChannelHandler按照加⼊的順序會組成⼀個雙向鏈表,⼊站事件從鏈表的head往后傳遞到最后⼀個ChannelHandler。 
  2. 出站事件從鏈表的tail向前傳遞,直到最后⼀個ChannelHandler,兩種類型的ChannelHandler相互不會影響。 

7 2.5 Bootstrap

1. 初識Bootstrap

  1. 是引導作用,配置整個netty程序,將各個組件串起來,最后綁定接口,啟動服務。 

2. Bootstrap兩種類型(Bootstrap、ServerBootstrap)

  1. 客戶端只需要一個EventLoopGroup,服務端需要兩個EventLoopGroup。 

 

上圖解釋

  1. 與ServerChannel相關聯的EventLoopGroup 將分配⼀個負責為傳⼊連接請求創建 Channel 的EventLoop。 
  2. ⼀旦連接被接受,第⼆個 EventLoopGroup 就會給它的 Channel 分配⼀個 EventLoop。 

8 2.6 Future

1. 初識

  1. 操作完成時通知應用程序的方式。這個對象可以看做異步操作執行結果占位符,它在將來某個時刻完成,并提供對其結果的訪問。 

2. ChannelFuture的由來

  1. JDK 預置了 interface java.util.concurrent.Future,但是其所提供的實現, 
  2. 只允許⼿動檢查對應的操作是否已經完成,或者⼀直阻塞直到它完成。這是⾮常 
  3. 繁瑣的,所以 Netty 提供了它⾃⼰的實現--ChannelFuture,⽤于在執⾏異步 
  4. 操作的時候使⽤。 

3. Netty為什么完全是異步?

  1. a ChannelFuture提供了⼏種額外的⽅法,這些⽅法使得我們能夠注冊⼀個或者多個  ChannelFutureListener實例。 
  2.  
  3. b 監聽器的回調⽅法operationComplete(),將會在對應的操作完成時被調⽤。 
  4.  然后監聽器可以判斷該操作是成功地完成了還是出錯了。 
  5.    
  6. c 每個 Netty 的出站 I/O 操作都將返回⼀個 ChannelFuture,也就是說, 
  7.  它們都不會阻塞。所以說,Netty完全是異步和事件驅動的。 

9 2.7 組件小結


上圖解釋

  1. 將組件串起來 

03三 緩存區-ByteBuf

ByteBuf是我們開發中代碼操作最多部分和出現問題最多的一部分。比如常見的TCP協議通信的粘包和拆包解決,和ByteBuf密切相關。后面文章會詳細分析,先不展開。我們這里先了解ByteBuf的常用API和執行內幕。

10 3.1 ByteBuf概述

1. 初識ByteBuf

  1. JavaNIO提供了緩存容器(ByteBuffer),但是使用復雜。因此netty引入緩存ButeBuf, 
  2. 一串字節數組構成。 

2. ByteBuf兩個索引(readerIndex,writerIndex)

  1. a readerIndex 將會根據讀取的字節數遞增 
  2. b writerIndex 也會根據寫⼊的字節數進⾏遞增 
  3.  
  4. 注意:如果readerIndex超過了writerIndex的時候,Netty會拋出IndexOutOf-BoundsException異常。 

 

11 3.2 ByteBuf基本使用

1. 讀取

  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf01 { 
  6.     public static void main(String[] args) { 
  7.         //構造 
  8.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world"
  9.         CharsetUtil.UTF_8); 
  10.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  11.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  12.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  13.         while (byteBuf.isReadable()){ //⽅法⼀:內部通過移動readerIndex進⾏讀取 
  14.          System.out.println((char)byteBuf.readByte()); 
  15.         } 
  16.         //⽅法⼆:通過下標直接讀取 
  17.         for (int i = 0; i < byteBuf.readableBytes(); i++) { 
  18.          System.out.println((char)byteBuf.getByte(i)); 
  19.         } 
  20.         //⽅法三:轉化為byte[]進⾏讀取 
  21.         byte[] bytes = byteBuf.array(); 
  22.         for (byte b : bytes) { 
  23.         System.out.println((char)b); 
  24.         } 
  25.     } 

2. 寫入

  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf02 { 
  6.     public static void main(String[] args) { 
  7.         //構造空的字節緩沖區,初始⼤⼩為10,最⼤為20 
  8.         ByteBuf byteBuf = Unpooled.buffer(10,20); 
  9.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  10.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  11.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  12.         for (int i = 0; i < 5; i++) { 
  13.          byteBuf.writeInt(i); //寫⼊int類型,⼀個int占4個字節 
  14.         } 
  15.         System.out.println("ok"); 
  16.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  17.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  18.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  19.         while (byteBuf.isReadable()){ 
  20.          System.out.println(byteBuf.readInt()); 
  21.         } 
  22.     } 

3. 丟棄已讀字節


  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf03 { 
  6.     public static void main(String[] args) { 
  7.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8); 
  8.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  9.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  10.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  11.         while (byteBuf.isReadable()){ 
  12.          System.out.println((char)byteBuf.readByte()); 
  13.         } 
  14.         byteBuf.discardReadBytes(); //丟棄已讀的字節空間 
  15.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  16.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  17.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  18.     } 

4. clear()


  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf04 { 
  6.     public static void main(String[] args) { 
  7.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8); 
  8.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  9.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  10.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  11.         byteBuf.clear(); //重置readerIndex 、 writerIndex 為0 
  12.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  13.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  14.         System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes()); 
  15.     } 

12 3.3 ByteBuf 使⽤模式

3.3.1 根據存放緩沖區,分為三類

1. 堆緩存區(HeapByteBuf)

  1. 內存的分配和回收速度⽐較快,可以被JVM⾃動回收,缺點是,如果進⾏socket的IO讀寫,需要額外做⼀次內存復制,將堆內存對應的緩沖區復制到內核Channel中,性能會有⼀定程度的下降。 
  2. 由于在堆上被 JVM 管理,在不被使⽤時可以快速釋放。可以通過 ByteBuf.array() 來獲取 byte[] 數 
  3. 據。 

2. 直接緩存區(DirectByteBuf)

  1. ⾮堆內存,它在對外進⾏內存分配,相⽐堆內存,它的分配和回收速度會慢⼀些,但是 
  2. 將它寫⼊或從Socket Channel中讀取時,由于減少了⼀次內存拷⻉,速度⽐堆內存塊。 

3. 復合緩存區

  1. 顧名思義就是將上述兩類緩沖區聚合在⼀起。Netty 提供了⼀個 CompsiteByteBuf, 
  2. 可以將堆緩沖區和直接緩沖區的數據放在⼀起,讓使⽤更加⽅便。 

3.3.2 緩存區選擇

Netty默認使⽤的是直接緩沖區(DirectByteBuf),如果需要使⽤堆緩沖區(HeapByteBuf)模式,則需要進⾏系統參數的設置。

  1. //netty中IO操作都是基于Unsafe完成的 
  2. System.setProperty("io.netty.noUnsafe""true");  
  3. //ByteBuf的分配要設置為⾮池化,否則不能切換到堆緩沖器模式 
  4. serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT); 

3.3.3 ByteBuf對象是否池化(Netty是默認池化的)

1. 池化化和非池化的實現

  1. PooledByteBufAllocator,實現了ByteBuf的對象的池化,提⾼性能減少并最⼤限度地減少內存碎⽚。 
  2. UnpooledByteBufAllocator,沒有實現對象的池化,每次會⽣成新的對象實例。 

2. 代碼實現(讓Netty中ByteBuf對象不池化)

  1. //通過ChannelHandlerContext獲取ByteBufAllocator實例 
  2. ctx.alloc(); 
  3. //通過channel也可以獲取 
  4. channel.alloc(); 
  5.  
  6. //Netty默認使⽤了PooledByteBufAllocator 
  7.  
  8. //可以在引導類中設置⾮池化模式 
  9. serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT); 
  10. //或通過系統參數設置 
  11. System.setProperty("io.netty.allocator.type""pooled"); 
  12. System.setProperty("io.netty.allocator.type""unpooled"); 

我在開發項目中,我一般不進行更改。因為我覺得池化效率更高。有其他高見,歡迎留言。

13 3.5 ByteBuf的釋放

ByteBuf如果采⽤的是堆緩沖區模式的話,可以由GC回收,但是如果采⽤的是直接緩沖區,就不受GC的 管理,就得⼿動釋放,否則會發⽣內存泄露。

3.5.1 ByteBuf的手動釋放(一般不推薦使用,了解)

1. 實現邏輯

  1. ⼿動釋放,就是在使⽤完成后,調⽤ReferenceCountUtil.release(byteBuf); 進⾏釋放。 
  2. 通過release⽅法減去 byteBuf的使⽤計數,Netty 會⾃動回收 byteBuf。 

2. 代碼

  1. /** 
  2. * 獲取客戶端發來的數據 
  3. * @param ctx 
  4. * @param msg 
  5. * @throws Exception 
  6. */ 
  7. @Override 
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  9.     ByteBuf byteBuf = (ByteBuf) msg; 
  10.     String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  11.     System.out.println("客戶端發來數據:" + msgStr); 
  12.     //釋放資源 
  13.     ReferenceCountUtil.release(byteBuf); 

注意:

⼿動釋放可以達到⽬的,但是這種⽅式會⽐較繁瑣,如果⼀旦忘記釋放就可能會造成內存泄露。

3.5.1 ByteBuf的自動釋放

⾃動釋放有三種⽅式,分別是:⼊站的TailHandler、繼承SimpleChannelInboundHandler、 HeadHandler的出站釋放。

1. TailHandler

Netty的ChannelPipleline的流⽔線的末端是TailHandler,默認情況下如果每個⼊站處理器Handler都把消息往下傳,TailHandler會釋放掉ReferenceCounted類型的消息。

  1. /** 
  2. * 獲取客戶端發來的數據 
  3. * @param ctx 
  4. * @param msg 
  5. * @throws Exception 
  6. */ 
  7. @Override 
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  9.     ByteBuf byteBuf = (ByteBuf) msg; 
  10.     String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  11.     System.out.println("客戶端發來數據:" + msgStr); 
  12.     //向客戶端發送數據 
  13.     ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  14.     ctx.fireChannelRead(msg); //將ByteBuf向下傳遞 

源碼:

在DefaultChannelPipeline中的TailContext內部類會在最后執⾏

  1. @Override 
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) { 
  3.  onUnhandledInboundMessage(ctx, msg); 
  4. //最后會執⾏ 
  5. protected void onUnhandledInboundMessage(Object msg) { 
  6.   try { 
  7.       logger.debug( 
  8.       "Discarded inbound message {} that reached at the tail of the 
  9.       pipeline. " + "Please check your pipeline configuration.", msg); 
  10.   } finally { 
  11.     ReferenceCountUtil.release(msg); //釋放資源 
  12.   } 

2. SimpleChannelInboundHandler

當ChannelHandler繼承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的channelRead()⽅法中,將會進⾏資源的釋放。

SimpleChannelInboundHandler的源碼

  1. //SimpleChannelInboundHandler中的channelRead() 
  2. @Override 
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  4.     boolean release = true
  5.     try { 
  6.       if (acceptInboundMessage(msg)) { 
  7.         @SuppressWarnings("unchecked"
  8.         I imsg = (I) msg; 
  9.         channelRead0(ctx, imsg); 
  10.       } else { 
  11.         release = false
  12.         ctx.fireChannelRead(msg); 
  13.       } 
  14.     } finally { 
  15.       if (autoRelease && release) { 
  16.        ReferenceCountUtil.release(msg); //在這⾥釋放 
  17.       } 
  18.     } 

我們handler代碼編寫:

  1. package com.haopt.myrpc.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8.     @Override 
  9.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  10.       System.out.println("接收到服務端的消息:" + 
  11.       msg.toString(CharsetUtil.UTF_8)); 
  12.     } 
  13.     @Override 
  14.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15.       // 向服務端發送數據 
  16.       String msg = "hello"
  17.       ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18.     } 
  19.     @Override 
  20.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  21.       cause.printStackTrace(); 
  22.       ctx.close(); 
  23.     } 

3. 堆緩沖區(HeadHandler)

出站處理流程中,申請分配到的ByteBuf,通過HeadHandler完成⾃動釋放。

在出站流程開始的時候,通過調⽤ctx.writeAndFlush(msg),Bytebuf緩沖區開始進⼊出站處理的pipeline流⽔線。

在每⼀個出站Handler中的處理完成后,最后消息會來到出站的最后⼀棒HeadHandler,再經過⼀輪復雜的調⽤,在flush完成后終將被release掉。

  1. package com.haopt.myrpc.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8. @Override 
  9. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
  10. Exception { 
  11. System.out.println("接收到服務端的消息:" + 
  12. msg.toString(CharsetUtil.UTF_8)); 
  13. @Override 
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15. // 向服務端發送數據 
  16. String msg = "hello"
  17. ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18. @Override 
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
  20. throws Exception { 
  21. cause.printStackTrace(); 
  22. ctx.close(); 

 

14 3.6 ByteBuf小結

a ⼊站流程中,如果對原消息不做處理,調ctx.fireChannelRead(msg) 把

原消息往下傳,由流⽔線最后⼀棒 TailHandler 完成⾃動釋放。

b 如果截斷了⼊站處理流⽔線,則繼承SimpleChannelInboundHandler ,完成⼊站ByteBuf ⾃動釋放。

c 出站處理過程中,申請分配到的 ByteBuf,通過 HeadHandler 完成⾃動釋放。

d ⼊站處理中,如果將原消息轉化為新的消息ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。

e ⼊站處理中,如果已經不再調⽤ ctx.fireChannelRead(msg) 傳遞任何消息,也沒有繼承SimpleChannelInboundHandler 完成⾃動釋放,那更要把原消息release掉。

 

責任編輯:姜華 來源: 花花和Java
相關推薦

2018-09-04 16:20:46

MySQ索引數據結構

2018-11-21 08:00:05

Dubbo分布式系統

2020-11-06 16:50:43

工具GitLab CICD

2010-06-23 20:31:54

2010-11-19 16:22:14

Oracle事務

2020-09-21 09:53:04

FlexCSS開發

2022-08-26 13:48:40

EPUBLinux

2010-07-13 09:36:25

2009-08-25 16:27:10

Mscomm控件

2021-01-27 11:10:49

JVM性能調優

2011-11-07 09:37:42

Hpyer-V虛擬化云計算

2020-07-20 06:35:55

BashLinux

2023-10-06 00:04:02

2010-11-15 11:40:44

Oracle表空間

2011-07-18 15:08:34

2022-06-03 10:09:32

威脅檢測軟件

2010-11-08 13:54:49

Sqlserver運行

2018-06-22 13:05:02

前端JavaScript引擎

2021-04-28 10:13:58

zookeeperZNode核心原理

2010-09-27 09:31:42

JVM內存結構
點贊
收藏

51CTO技術棧公眾號

99国产精品视频免费观看一公开 | 久久久国产影院| 亚洲精品自拍网| caopen在线视频| 成人精品国产一区二区4080| 欧美在线影院在线视频| 色撸撸在线视频| 一区二区三区四区视频免费观看| 欧美日韩国产在线看| 亚洲欧美影院| 少妇精品高潮欲妇又嫩中文字幕| 丝袜脚交一区二区| 久久久精品视频成人| 激情综合丁香五月| 亚洲欧美在线综合| 精品久久久一区二区| 伊人情人网综合| 五月婷婷六月丁香综合| 日韩精品五月天| 欧美激情在线观看| 国产白丝一区二区三区| 精品国产乱子伦一区二区| 欧美午夜视频网站| 欧美日韩成人免费视频| 老司机午夜在线视频| 97久久超碰国产精品| 91精品视频免费看| 成人免费毛片视频| 亚洲欧洲一区| 欧美成人精品一区二区三区| 国产一区二区三区精品在线| 精品伊人久久久| 欧美一级欧美三级| 亚洲另类第一页| 日本电影欧美片| 精品久久久久久亚洲精品| 免费观看中文字幕| av中文字幕一区二区三区| 97国产一区二区| 国产高清自拍一区| 国产美女免费看| 美日韩一区二区| 国产成人精品综合| 美日韩一二三区| 亚洲精品激情| 久久久久久久国产精品| 九九视频免费看| 亚洲最大黄网| 日韩视频在线观看免费| 日韩一区二区三区四区视频| 红桃视频在线观看一区二区| 日韩福利视频在线观看| 日本道中文字幕| a看欧美黄色女同性恋| 精品日韩欧美在线| www.美色吧.com| 大香伊人久久精品一区二区| 精品国免费一区二区三区| 原创真实夫妻啪啪av| 日本超碰一区二区| 日韩午夜电影在线观看| 亚洲丝袜在线观看| 91精品尤物| 亚洲国产精品久久久久久| 亚洲av熟女高潮一区二区| 国产精品中文字幕制服诱惑| 精品卡一卡二卡三卡四在线| 私密视频在线观看| 九九综合久久| 中文字幕精品在线视频| 久久久久久久久久97| 亚洲h色精品| 色综合天天狠天天透天天伊人 | 日韩国产一区| 日韩在线观看免费高清完整版| 亚洲 欧美 国产 另类| 婷婷久久一区| 久久人人爽人人爽人人片av高清| 可以免费看的av毛片| 先锋影音久久| 国产精品永久在线| www.狠狠干| a级高清视频欧美日韩| 欧美日韩中文国产一区发布| 91欧美在线视频| 亚洲一区视频在线观看视频| 欧美亚洲国产成人| 国产69精品久久| 日韩久久精品一区| 我和岳m愉情xxxⅹ视频| 99久久综合| 久久久噜噜噜久久中文字免| 特级毛片www| 精品一区二区日韩| 极品校花啪啪激情久久| wwwxxx在线观看| 亚洲综合久久久久| 毛葺葺老太做受视频| 国产免费区一区二区三视频免费 | 综合精品久久| 欧美在线视频在线播放完整版免费观看| 波多野结衣电车痴汉| 精品一区二区三区的国产在线播放| 岛国视频一区免费观看| 国产视频福利在线| 亚洲综合色噜噜狠狠| 免费大片在线观看| 97人人澡人人爽91综合色| 亚洲视频axxx| 日本熟妇毛耸耸xxxxxx| 美女尤物国产一区| 精品一区国产| 超碰个人在线| 欧美综合亚洲图片综合区| 师生出轨h灌满了1v1| 成人一级毛片| 欧美一级大片在线观看| 国产黄色片免费| 日本一区二区在线不卡| 欧洲黄色一级视频| 97久久亚洲| 久久夜色精品亚洲噜噜国产mv| 中文在线第一页| 成人综合激情网| 亚洲天堂av免费在线观看| 中文在线中文资源| 精品99999| 欧美日韩在线视频免费| 久久国产成人午夜av影院| 日韩av影视| 中文字幕在线直播| 日韩精品一区国产麻豆| 国产精品精品软件男同| 蜜臀久久久久久久| 日本在线播放不卡| 中文字幕人成乱码在线观看| 精品国一区二区三区| 欧美三级日本三级| 国产中文字幕一区| 一区视频二区视频| 色999久久久精品人人澡69| 一区二区三区黄色| 亚洲GV成人无码久久精品| 99久久99久久精品免费观看| 国产精品日韩三级| 中文字幕亚洲在线观看| 欧美乱大交xxxxx另类电影| 国产一区二区小视频| 国产精品久久久久婷婷二区次| 亚洲五月天综合| 精品国产精品国产偷麻豆| 日本欧美在线视频| 黄色av免费在线看| 在线日韩av片| 国产成人一区二区在线观看| 日韩高清在线电影| 涩涩日韩在线| 亚洲精品成人一区| 久久在线视频在线| 性色av蜜臀av| 亚洲国产另类精品专区| 黄色网址在线视频| 天堂成人免费av电影一区| 欧美专区一二三| 成人mm视频在线观看| 日韩在线观看高清| a在线观看免费| 亚洲在线免费播放| 人妻丰满熟妇av无码久久洗澡 | 亚洲欧美自拍另类日韩| 天天综合久久| 国产视频在线观看一区| 中文字幕成在线观看| 一本大道久久加勒比香蕉| 中文字幕欧美人妻精品一区蜜臀| 国产精品国产自产拍在线| 欧美国产日韩在线视频 | 欧美成人黑人猛交| 欧美gayvideo| 岛国一区二区三区高清视频| 国产伦子伦对白在线播放观看| 亚洲免费视频在线观看| 正在播放亚洲精品| 亚洲一区日韩精品中文字幕| 青青草福利视频| 久久激五月天综合精品| www.夜夜爱| 国产不卡一区| 91久久精品国产91久久性色tv| 欲香欲色天天天综合和网| 中文字幕亚洲欧美一区二区三区 | 日韩福利一区| 久久久精品国产亚洲| 婷婷伊人综合中文字幕| 欧美日韩亚洲综合在线| 日韩激情一区二区三区| 国产欧美1区2区3区| 老熟女高潮一区二区三区| 久久久久国产精品一区二区| 超碰10000| 精品国产aⅴ| 国产精品免费一区二区三区| 成人交换视频| 国内精品久久久| 欧美三级理伦电影| 亚洲精品久久久久久下一站 | www.com.cn成人| 欧美成年人网站| av网在线观看| 精品无人区太爽高潮在线播放 | 国产精品香蕉一区二区三区| 蜜臀久久99精品久久久酒店新书| 午夜国产欧美理论在线播放| 日韩电影大全在线观看| 久久精品国产亚洲5555| 国产综合色香蕉精品| 欧美日韩免费看片| 午夜精品久久久久久久白皮肤| √新版天堂资源在线资源| 日韩成人久久久| www日本在线| 欧美日韩亚洲高清一区二区| 美女又爽又黄免费视频| 亚洲v中文字幕| 欧美日韩在线国产| 亚洲日本在线天堂| 蜜桃av.com| 欧美国产在线观看| 丰满少妇高潮一区二区| av在线综合网| 国产精九九网站漫画| 国产在线不卡一区| 国产乱女淫av麻豆国产| 蜜臀av一级做a爰片久久| 日韩欧美精品在线观看视频| 日韩一级在线| 无码粉嫩虎白一线天在线观看| 欧美国产精品| av中文字幕av| 午夜国产欧美理论在线播放 | 欧美日韩成人一区二区| 波多野结衣小视频| 在线一区二区三区四区五区| 亚洲黄网在线观看| 高潮白浆女日韩av免费看| 国产精品第9页| 精品日本美女福利在线观看| 日韩av黄色片| 天天综合天天综合色| 中文字幕在线字幕中文| 午夜一区二区三区在线观看| 日韩乱码在线观看| 偷拍亚洲欧洲综合| 日本在线播放视频| 欧美性生活大片免费观看网址 | 在线播放日韩导航| 国产精品天天操| 日韩色在线观看| 成人午夜免费在线观看| 亚洲国产成人精品电影| 少妇精品高潮欲妇又嫩中文字幕| 亚洲国产91精品在线观看| 亚洲色偷精品一区二区三区| 亚洲精品一区av在线播放| 国产综合视频一区二区三区免费| 一区二区中文字幕| 久热国产在线| 久久久久亚洲精品国产| 亚洲欧美电影| 国产精品视频免费在线观看| 欧美视频第一| 成人免费在线看片| 日本午夜精品| 色吧亚洲视频| 伊人久久大香线蕉综合四虎小说| 日韩xxxx视频| 久久精品盗摄| 五月天视频在线观看| 成人午夜视频免费看| 97伦伦午夜电影理伦片| 亚洲欧美影音先锋| 午夜偷拍福利视频| 欧美中文字幕亚洲一区二区va在线 | 无码精品黑人一区二区三区| 在线视频日本亚洲性| 日韩特级毛片| 日本国产一区二区三区| 欧美天堂在线| 韩国一区二区三区美女美女秀| 国产成人精品免费视| 国产高清免费在线| 国产精品久久久久9999高清| 亚洲欧美自拍另类日韩| 成人avav在线| 国产3级在线观看| 精品久久久久久久久中文字幕| 一区二区三区午夜| 日韩黄色高清视频| 国产在线69| 青青a在线精品免费观看| 亚洲日韩中文字幕一区| 就去色蜜桃综合| 欧美.www| 亚洲色图 在线视频| av网站一区二区三区| 天天看天天摸天天操| 色综合激情五月| 亚洲美女性生活| 日韩在线观看网址| 午夜影院在线播放| 亚洲伊人久久综合| 清纯唯美日韩| 黄色片久久久久| 成人综合婷婷国产精品久久免费| 一区二区三区在线播放视频| 日韩欧美一区二区在线| 人人妻人人玩人人澡人人爽| 久久亚洲私人国产精品va| av在线一区不卡| 精品国产免费一区二区三区 | 国产精品女同一区二区三区| 亚洲 欧美 日韩 综合| 精品奇米国产一区二区三区| 看黄网站在线| 国产日韩精品在线播放| 加勒比久久综合| 国产精品50p| 成人av网站在线观看| www.av视频| 日韩一区二区视频| 乱人伦中文视频在线| 国产精品久久一区主播| 国产精品手机在线播放| ww国产内射精品后入国产| www.久久久久久久久| 国产污片在线观看| 精品精品国产高清a毛片牛牛| 91在线中文| 91精品天堂| 狠色狠色综合久久| 91porn在线| 亚洲午夜久久久久久久久电影网| 国产日韩精品suv| 美女av一区二区三区| 警花av一区二区三区 | 国产日韩欧美三区| jizz日本免费| 一本到不卡免费一区二区| 九色视频成人自拍| 国产精品扒开腿做爽爽爽男男| 亚洲精华一区二区三区| 黄www在线观看| 久久久影院官网| 久久久蜜桃一区二区| 亚洲视频欧美视频| 69堂免费精品视频在线播放| 日本不卡一区| 久久精品国产精品亚洲红杏| 99久久婷婷国产综合| 欧美一区二区三区免费观看视频| 在线视频观看国产| 豆国产97在线| 宅男噜噜噜66一区二区| www.久久国产| 欧美三级韩国三级日本三斤| 成人午夜在线影视| 超碰97在线播放| 亚洲在线网站| 欧美aaa级片| 日韩视频免费直播| 中文字幕乱码中文乱码51精品| 日日噜噜噜噜夜夜爽亚洲精品| 久久er精品视频| 久久成人在线观看| 日韩av在线网页| 欧美日韩在线精品一区二区三区激情综合 | 中文字幕免费不卡| 99精品在线视频观看| 欧美极品美女视频网站在线观看免费| 精品精品精品| 91制片厂毛片| 一级特黄大欧美久久久| 日韩资源在线| 国产在线高清精品| 亚洲三级网站| 亚洲精品电影院| 亚洲成人av片在线观看| 精品久久在线| 免费看黄在线看| 国产精品理论片| 午夜激情小视频| 国产日韩精品在线播放| 最新日韩欧美| 欧美88888| 亚洲精品大尺度| 国产区一区二| 国产精品视频一区二区三区四区五区| 国产精品久久免费看| 天天干免费视频| 91九色国产视频|