深入理解Netty編解碼、粘包拆包、心跳機制
前言
Netty系列文章:
- BIO 、NIO 、AIO 總結
- Unix網絡編程中的五種IO模型
- 深入理解IO多路復用實現機制
- Netty核心功能與線程模型
前面我們講了 BIO、NIO、AIO 等一些基礎知識和Netty核心功能與線程模型,本篇重點來理解Netty的編解碼、粘包拆包、心跳機制等實現原理進行講解。
Netty編解碼
Netty 涉及到編解碼的組件有 Channel 、 ChannelHandler 、 ChannelPipe 等,我們先大概了解下這幾個組件的作用。
ChannelHandler
ChannelHandler 充當來處理入站和出站數據的應用程序邏輯容器。例如,實現 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),你就可以接收入站事件和數據,這些數據隨后會被你的應用程序的業務邏輯處理。當你要給連接的客戶端發送響應時,也可以從 ChannelInboundHandler 刷數據。你的業務邏輯通常下在一個或者多個 ChannelInboundHandler 中。
ChannelOutboundHandler 原理一樣,只不過它是用來處理出站數據的。
ChannelPipeline
ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例,如果有事件的運動方向是從客戶端到服務端,那么我們稱這些事件為出站的,即客戶端發送給服務端的數據會通過 pipeline 中的一系列 ChannelOutboundHandler (ChannelOutboundHandler 調用是從 tail 到 head 方向逐個調用每個 handler 的邏輯),并被這些 Hadnler 處理,反之稱為入站的,入站只調用 pipeline 里的 ChannelInboundHandler 邏輯(ChannelInboundHandler 調用是從 head 到 tail 方向 逐個調用每個 handler 的邏輯。)
編解碼器
當你通過Netty發送或者接受一個消息的時候,就將會發生一次數據轉換。入站消息會被解碼:從字節轉換為另一種格式(比如java對象);如果是出站消息,它會被編碼成字節。
Netty提供了一系列實用的編碼解碼器,它們都實現了ChannelInboundHadnler或者ChannelOutboundHandler接口。在這些類中, channelRead方法已經被重寫了。
以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調用。隨后,它將調用由已知解碼器所提供的decode()方法進行解碼,并將已經解碼的字節轉發給ChannelPipeline中的下一個ChannelInboundHandler。
Netty提供了很多編解碼器,比如編解碼字符串的StringEncoder和StringDecoder,編解碼對象的ObjectEncoder和ObjectDecoder 等。
當然也可以通過集成ByteToMessageDecoder自定義編解碼器。
示例代碼
完整代碼在 Github :
https://github.com/Niuh-Study/niuh-netty.git
對應的包 com.niuh.netty.codec
Netty粘包拆包
TCP 粘包拆包是指發送方發送的若干包數據到接收方接收時粘成一包或某個數據包被拆開接收。如下圖所示,client 發送了兩個數據包 D1 和 D2,但是 server 端可能會收到如下幾種情況的數據。
程序演示
首先準備客戶端負責發送消息,連續發送5次消息,代碼如下:
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + " ", Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
然后服務端作為接收方,接收并且打印結果:
- // count 變量,用于計數
- private int count;
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("服務器讀取線程 " + Thread.currentThread().getName());
- ByteBuf buf = (ByteBuf) msg;
- byte[] bytes = new byte[buf.readableBytes()];
- // 把ByteBuf的數據讀到bytes數組中
- buf.readBytes(bytes);
- String message = new String(bytes, Charset.forName("utf-8"));
- System.out.println("服務器接收到數據:" + message);
- // 打印接收的次數
- System.out.println("接收到的數據量是:" + (++this.count));
- }
啟動服務端,再啟動兩個客戶端發送消息,服務端的控制臺可以看到這樣:
粘包的問題其實是隨機的,所以每次結果都不太一樣。
完整代碼在 Github :
https://github.com/Niuh-Study/niuh-netty.git
對應的包 com.niuh.splitpacket0
為什么出現粘包現象?
TCP 是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有成對的 socket,因此,發送端為了將多個發送給接收端的包,更有效的發送給對方,使用了優化方法(Nagle算法),將多次間隔較少且數據量小的數據,合并成一個大的數據塊,然后進行封包,這樣做雖然提供了效率,但是接收端就難以分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的。
如何理解TCP是面向字節流的
- 應用程序和 TCP 的交互是一次一個數據塊(大小不等),但 TCP 把應用程序交下來的數據僅僅看成是一連串的無結構的字節流。TCP 并不知道所傳送的字節流的含義;
- 因此 TCP 不保證接收方應用程序所收到的數據塊和發送方應用程序所發出的數據塊具有對應大小的關系(例如,發送方應用程序交給發送方的 TCP 共 10 個數據塊,但接收方的 TCP 可能只用了 4 個就把收到的字節流交付上層的應用程序);
- 同時,TCP 不關心應用進程一次把多長的報文發送到 TCP 的緩存中,而是根據對方給出的窗口值和當前網絡阻塞的程度來決定一個報文段應包含多少個字節(UDP 發送的報文長度是應用進程給出的)。如果應用進程傳送到 TCP 緩存的數據塊太長,TCP 就可以把它劃分短一點再傳送。如果應用程序一次只發來一個字節,TCP 也可以等待積累有足夠多的字節后再構成報文段發送出去。
TCP發送報文一般是 3 個時機
- 緩沖區數據達到,最大報文長度 MSS;
- 由發送端的應用進程指明要求發送報文段,即 TCP 支持的推送(push)操作;
- 當發送方的一個計時器期限到了,即使長度不超過 MSS,也發送。
解決方案
一般解決粘包拆包問題有 4 種辦法
1.在數據的末尾添加特殊的符號標識數據包的邊界。通常會加\n、\r、\t或者其他的符號
學習 HTTP、FTP 等,使用回車換行符號;
2.在數據的頭部聲明數據的長度,按長度獲取數據
將消息分為 head 和 body,head 中包含 body 長度的字段,一般 head 的第一個字段使用 int 值來表示 body 長度;
3.規定報文的長度,不足則補空位。讀取時按規定好的長度來讀取。比如 100 字節,如果不夠就補空格;
4.使用更復雜的應用層協議。
使用LineBasedFrameDecoder
LineBasedFrameDecoder 是Netty內置的一個解碼器,對應的編碼器是 LineEncoder。
原理是上面講的第一種思路,在數據末尾加上特殊符號以標識邊界。默認是使用換行符\n。
用法很簡單,發送方加上編碼器:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加編碼器,使用默認的符號\n,字符集是UTF-8
- ch.pipeline().addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8));
- ch.pipeline().addLast(new TcpClientHandler());
- }
接收方加上解碼器:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //解碼器需要設置數據的最大長度,我這里設置成1024
- ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
- //給pipeline管道設置業務處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
然后在發送方,發送消息時在末尾加上標識符:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- //在末尾加上默認的標識符\n
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED, Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
于是我們再次啟動服務端和客戶端,在服務端的控制臺可以看到:
在數據的末尾添加特殊的符號標識數據包的邊界,粘包、拆包的問題就得到解決了。
注意:數據末尾一定是分隔符,分隔符后面不要再加上數據,否則會當做下一條數據的開始部分。下面是錯誤演示:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- //在末尾加上默認的標識符\n
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED + "[我是分隔符后面的字符串]", Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
服務端的控制臺就會看到這樣的打印信息:
使用自定義長度幀解碼器
使用這個解碼器解決粘包問題的原理是上面講的第二種,在數據的頭部聲明數據的長度,按長度獲取數據。這個解碼器構造器需要定義5個參數,相對較為復雜一點,先看參數的解釋:
- maxFrameLength 發送數據包的最大長度
- lengthFieldOffset 長度域的偏移量。長度域位于整個數據包字節數組中的開始下標。
- lengthFieldLength 長度域的字節數長度。長度域的字節數長度。
- lengthAdjustment 長度域的偏移量矯正。如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
- initialBytesToStrip 丟棄的起始字節數。丟棄處于此索引值前面的字節。
前面三個參數比較簡單,可以用下面這張圖進行演示:
矯正偏移量是什么意思呢?
是假設你的長度域設置的值除了包括有效數據的長度還有其他域的長度包含在里面,那么就要設置這個值進行矯正,否則解碼器拿不到有效數據。
丟棄的起始字節數。這個比較簡單,就是在這個索引值前面的數據都丟棄,只要后面的數據。一般都是丟棄長度域的數據。當然如果你希望得到全部數據,那就設置為0。
下面就在消息接收端使用自定義長度幀解碼器,解決粘包的問題:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //數據包最大長度是1024
- //長度域的起始索引是0
- //長度域的數據長度是4
- //矯正值為0,因為長度域只有 有效數據的長度的值
- //丟棄數據起始值是4,因為長度域長度為4,我要把長度域丟棄,才能得到有效數據
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
- ch.pipeline().addLast(new TcpClientHandler());
- }
接著編寫發送端代碼,根據解碼器的設置,進行發送:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- String str = "msg No" + i;
- ByteBuf byteBuf = Unpooled.buffer(1024);
- byte[] bytes = str.getBytes(Charset.forName("utf-8"));
- //設置長度域的值,為有效數據的長度
- byteBuf.writeInt(bytes.length);
- //設置有效數據
- byteBuf.writeBytes(bytes);
- ctx.writeAndFlush(byteBuf);
- }
- }
然后啟動服務端,客戶端,我們可以看到控制臺打印結果:
可以看到,利用自定義長度幀解碼器解決了粘包問題。
使用Google Protobuf編解碼器
Netty官網上是明顯寫著支持Google Protobuf的,如下圖所示:
Google Protobuf是什么
官網的原話: Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
翻譯一下:Protocol buffers是Google公司的與語言無關、平臺無關、可擴展的序列化數據的機制,類似XML,但是更小、更快、更簡單。您只需定義一次數據的結構化方式,然后就可以使用特殊生成的源代碼,輕松地將結構化數據寫入和讀取到各種數據流中,并支持多種語言。
在rpc或tcp通信等很多場景都可以使用。通俗來講,如果客戶端和服務端使用的是不同的語言,那么在服務端定義一個數據結構,通過protobuf轉化為字節流,再傳送到客戶端解碼,就可以得到對應的數據結構。這就是protobuf神奇的地方。并且,它的通信效率極高,“一條消息數據,用protobuf序列化后的大小是json的10分之一,xml格式的20分之一,是二進制序列化的10分之一”。
Google Protobuf 官網 :
https://developers.google.cn/protocol-buffers/
為什么使用Google Protobuf
在一些場景下,數據需要在不同的平臺,不同的程序中進行傳輸和使用,例如某個消息是用C++程序產生的,而另一個程序是用java寫的,當前者產生一個消息數據時,需要在不同的語言編寫的不同的程序中進行操作,如何將消息發送并在各個程序中使用呢?這就需要設計一種消息格式,常用的就有json和xml,protobuf出現的則較晚。
Google Protobuf優點
- protobuf 的主要優點是簡單,快;
- protobuf將數據序列化為二進制之后,占用的空間相當小,基本僅保留了數據部分,而xml和json會附帶消息結構在數據中;
- protobuf使用起來很方便,只需要反序列化就可以了,而不需要xml和json那樣層層解析。
Google Protobuf安裝
因為我這里是Mac系統,Mac下面除了用dmg、pkg來安裝軟件外,比較方便的還有用brew命令進行安裝 , 它能幫助安裝其他所需要的依賴,從而減少不必要的麻煩。
安裝最新版本的protoc
1.從github上下載 protobuf3
https://github.com/protocolbuffers/protobuf/releases/tag/v3.13.0
Mac系統選擇第一個,如下圖所示:
2.下載成功后,切換到root用戶
- sudo -i
3.解壓壓縮包,并進入你自己解壓的目錄
- tar xyf protobuf-all-3.13.0.tar.gz
- cd protobuf-3.13.0
4.設置編譯目錄
- ./configure --prefix=/usr/local/protobuf
5.安裝
- make
- make install
6.配置環境變量
第一步:找到.bash_profile文件并編輯
- cd ~
- open .bash_profile
第二步:然后在打開的bash_profile文件末尾添加如下配置:
- export PROTOBUF=/usr/local/protobuf
- export PATH=$PROTOBUF/bin:$PATH
第三步:source一下使文件生效
- source .bash_profile
7.測試安裝結果
- protoc --version
使用Google Protobuf
以下步驟參考Google Protobuf的github項目的指南。
https://github.com/protocolbuffers/protobuf/tree/master/java
第一步:添加maven依賴
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.11.0</version>
- </dependency>
第二步:編寫proto文件Message.proto
如何編寫.proto文件的相關文檔說明,可以去官網查看 下面寫一個例子,請看示范:
- syntax = "proto3"; //版本
- option java_outer_classname = "MessagePojo";//生成的外部類名,同時也是文件名
- message Message {
- int32 id = 1;//Message類的一個屬性,屬性名稱是id,序號為1
- string content = 2;//Message類的一個屬性,屬性名稱是content,序號為2
- }
第三步:使用編譯器,通過.proto文件生成代碼
在執行上面的安裝步驟后,進入到 bin 目錄下,可以看到一個可執行文件 protoc
- cd /usr/local/protobuf/bin/
然后復制前面寫好的Message.proto文件到此目錄下,如圖所示:
輸入命令:
- protoc --java_out=. Message.proto
然后就可以看到生成的MessagePojo.java文件。最后把文件復制到IDEA項目中。
第四步:在發送端添加編碼器,在接收端添加解碼器
客戶端添加編碼器,對消息進行編碼。
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //在發送端添加Protobuf編碼器
- ch.pipeline().addLast(new ProtobufEncoder());
- ch.pipeline().addLast(new TcpClientHandler());
- }
服務端添加解碼器,對消息進行解碼。
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加Protobuf解碼器,構造器需要指定解碼具體的對象實例
- ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
- //給pipeline管道設置處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
第五步:發送消息
客戶端發送消息,代碼如下:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- //使用的是構建者模式進行創建對象
- MessagePojo.Message message = MessagePojo
- .Message
- .newBuilder()
- .setId(1)
- .setContent("一角錢,起飛~")
- .build();
- ctx.writeAndFlush(message);
- }
服務端接收到數據,并且打印:
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MessagePojo.Message messagePojo) throws Exception {
- System.out.println("id:" + messagePojo.getId());
- System.out.println("content:" + messagePojo.getContent());
- }
測試結果正確:
分析Protocol的粘包、拆包
實際上直接使用Protocol編解碼器還是存在粘包問題的。
證明一下,發送端循環一百次發送100條"一角錢,起飛"的消息,請看發送端代碼演示:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 100; i++) {
- MessagePojo.Message message = MessagePojo
- .Message
- .newBuilder()
- .setId(i)
- .setContent(i + "號一角錢,起飛~")
- .build();
- ctx.writeAndFlush(message);
- }
- }
這時,啟動服務端,客戶端后,可能只有打印幾條消息或者在控制臺看到如下錯誤:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
意思是:分析protocol消息時,輸入意外地在字段中間結束。這可能意味著輸入被截斷,或者嵌入的消息誤報了自己的長度。
其實就是粘包問題,多條數據合并成一條數據了,導致解析出現異常。
解決Protocol的粘包、拆包問題
只需要在發送端加上編碼器 ProtobufVarint32LengthFieldPrepender
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
- ch.pipeline().addLast(new ProtobufEncoder());
- ch.pipeline().addLast(new TcpClientHandler());
- }
接收方加上解碼器 ProtobufVarint32FrameDecoder
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
- ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
- //給pipeline管道設置處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
然后再啟動服務端和客戶端,我們可以看到正常了~
ProtobufVarint32LengthFieldPrepender 編碼器的工作如下:
- * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
- * +---------------+ +--------+---------------+
- * | Protobuf Data |-------------->| Length | Protobuf Data |
- * | (300 bytes) | | 0xAC02 | (300 bytes) |
- * +---------------+ +--------+---------------+
- @Sharable
- public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
- int bodyLen = msg.readableBytes();
- int headerLen = computeRawVarint32Size(bodyLen);
- //寫入請求頭,消息長度
- out.ensureWritable(headerLen + bodyLen);
- writeRawVarint32(out, bodyLen);
- //寫入數據
- out.writeBytes(msg, msg.readerIndex(), bodyLen);
- }
- }
ProtobufVarint32FrameDecoder 解碼器的工作如下:
- * BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
- * +--------+---------------+ +---------------+
- * | Length | Protobuf Data |----->| Protobuf Data |
- * | 0xAC02 | (300 bytes) | | (300 bytes) |
- * +--------+---------------+ +---------------+
- ublic class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- //標記讀取的下標位置
- in.markReaderIndex();
- //獲取讀取的下標位置
- int preIndex = in.readerIndex();
- //解碼,獲取消息的長度,并且移動讀取的下標位置
- int length = readRawVarint32(in);
- //比較解碼前和解碼后的下標位置,如果相等。表示字節數不夠讀取,跳到下一輪
- if (preIndex == in.readerIndex()) {
- return;
- }
- //如果消息的長度小于0,拋出異常
- if (length < 0) {
- throw new CorruptedFrameException("negative length: " + length);
- }
- //如果不夠讀取一個完整的數據,reset還原下標位置。
- if (in.readableBytes() < length) {
- in.resetReaderIndex();
- } else {
- //否則,把數據寫入到out,接收端就拿到了完整的數據了
- out.add(in.readRetainedSlice(length));
- }
- }
總結:
- 發送端通過編碼器在發送的時候在消息體前面加上一個描述數據長度的數據塊。
- 接收方通過解碼器先獲取描述數據長度的數據塊,知道完整數據的長度,然后根據數據長度獲取一條完整的數據。
Netty心跳檢測機制
何為心跳
所謂心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性.
注:心跳包還有另一個作用,經常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接。
在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 看下它的構造器:
- public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
- this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
- }
三個參數的含義如下:
- readerIdleTimeSeconds: 讀超時。即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件。
- writerIdleTimeSeconds: 寫超時。 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件。
- allIdleTimeSeconds: 讀/寫超時。 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件。
注:這三個參數默認的時間單位是秒。若需要指定其他時間單位,可以使用另一個構造方法:
- public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
- this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
- }
要實現Netty服務端心跳檢測機制需要在服務器端的ChannelInitializer中加入如下的代碼:
- pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Netty心跳源碼分析
初步地看下IdleStateHandler源碼,先看下IdleStateHandler中的channelRead方法:
紅框代碼其實表示該方法只是進行了透傳,不做任何業務邏輯處理,讓channelPipe中的下一個handler處理channelRead方法;
我們再看看channelActive方法:
這里有個initialize的方法,這是IdleStateHandler的精髓,接著探究:
這邊會觸發一個Task,ReaderIdleTimeoutTask,這個task里的run方法源碼是這樣的:
第一個紅框代碼是用當前時間減去最后一次channelRead方法調用的時間,假如這個結果是6s,說明最后一次調用channelRead已經是6s 之前的事情了,你設置的是5s,那么nextDelay則為-1,說明超時了,那么第二個紅框代碼則會觸發下一個handler的 userEventTriggered方法:
如果沒有超時則不觸發userEventTriggered方法。
Netty心跳檢測代碼示例
服務端
- package com.niuh.netty.heartbeat;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.util.concurrent.TimeUnit;
- public class HeartBeatServer {
- public static void main(String[] args) throws Exception {
- EventLoopGroup boss = new NioEventLoopGroup();
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boss, worker)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- //IdleStateHandler的readerIdleTime參數指定超過3秒還沒收到客戶端的連接,
- //會觸發IdleStateEvent事件并且交給下一個handler處理,下一個handler必須
- //實現userEventTriggered方法處理對應事件
- pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
- pipeline.addLast(new HeartBeatServerHandler());
- }
- });
- System.out.println("netty server start。。");
- ChannelFuture future = bootstrap.bind(9000).sync();
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- worker.shutdownGracefully();
- boss.shutdownGracefully();
- }
- }
- }
服務端回調處理類
- package com.niuh.netty.heartbeat;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.timeout.IdleStateEvent;
- public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
- int readIdleTimes = 0;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
- System.out.println(" ====== > [server] message received : " + s);
- if ("Heartbeat Packet".equals(s)) {
- ctx.channel().writeAndFlush("ok");
- } else {
- System.out.println(" 其他信息處理 ... ");
- }
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- IdleStateEvent event = (IdleStateEvent) evt;
- String eventType = null;
- switch (event.state()) {
- case READER_IDLE:
- eventType = "讀空閑";
- readIdleTimes++; // 讀空閑的計數加1
- break;
- case WRITER_IDLE:
- eventType = "寫空閑";
- // 不處理
- break;
- case ALL_IDLE:
- eventType = "讀寫空閑";
- // 不處理
- break;
- }
- System.out.println(ctx.channel().remoteAddress() + "超時事件:" + eventType);
- if (readIdleTimes > 3) {
- System.out.println(" [server]讀空閑超過3次,關閉連接,釋放更多資源");
- ctx.channel().writeAndFlush("idle close");
- ctx.channel().close();
- }
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
- }
- }
客戶端
- package com.niuh.netty.heartbeat;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import java.util.Random;
- public class HeartBeatClient {
- public static void main(String[] args) throws Exception {
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- pipeline.addLast(new HeartBeatClientHandler());
- }
- });
- System.out.println("netty client start。。");
- Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
- String text = "Heartbeat Packet";
- Random random = new Random();
- while (channel.isActive()) {
- int num = random.nextInt(10);
- Thread.sleep(2 * 1000);
- channel.writeAndFlush(text);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- eventLoopGroup.shutdownGracefully();
- }
- }
- static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- System.out.println(" client received :" + msg);
- if (msg != null && msg.equals("idle close")) {
- System.out.println(" 服務端關閉連接,客戶端也關閉");
- ctx.channel().closeFuture();
- }
- }
- }
- }
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-netty.git















































