精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

RocketMQ的編解碼技術細節

開發 前端
RocketMQ序列化、反序列化有兩種方式:一種是將數據通過FastJSON將數據轉化成JSON字符串,然后轉化成byte[]數組進行編解碼。另一種方式是RocketMQ定義了一套自己的編解碼,將每個字段分別進行編解碼。編碼不論采用那種方式,都最終都編碼為byte[]。

 [[405364]]

本文轉載自微信公眾號「漫漫技術路」,作者劉蒞。轉載本文請聯系漫漫技術路公眾號。

從上一篇文章中,我們了解的RocketMQ不同組件之間,數據是如何通過網絡傳輸的,總結以下幾點

  • RocketMQ的網絡傳輸模塊,在remoting子模塊下,入口可以參考RemotingNettyServer和RemotingNettyClient兩個類。
  • RocketMQ是依靠Netty,與各個組件進行數據傳輸。
  • RocketMQ序列化、反序列化有兩種方式:一種是將數據通過FastJSON將數據轉化成JSON字符串,然后轉化成byte[]數組進行編解碼。另一種方式是RocketMQ定義了一套自己的編解碼,將每個字段分別進行編解碼。編碼不論采用那種方式,都最終都編碼為byte[]。

今天,我們分析RocketMQ的編解碼細節。在本篇文章中,可以學到:

  • RocketMQ網絡協議。
  • RocketMQ在傳輸數據時,內存的分配。
  • RocketMQ編解碼細節。

編碼流程

首先,我們編碼器org.apache.rocketmq.remoting.netty.NettyEncoder入手

  1. @ChannelHandler.Sharable 
  2. public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { 
  3.     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); 
  4.  
  5.     @Override 
  6.     public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out
  7.         throws Exception { 
  8.         try { 
  9.             ByteBuffer header = remotingCommand.encodeHeader(); 
  10.             out.writeBytes(header); 
  11.             byte[] body = remotingCommand.getBody(); 
  12.             if (body != null) { 
  13.                 out.writeBytes(body); 
  14.             } 
  15.         } catch (Exception e) { 
  16.             log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); 
  17.             if (remotingCommand != null) { 
  18.                 log.error(remotingCommand.toString()); 
  19.             } 
  20.             RemotingUtil.closeChannel(ctx.channel()); 
  21.         } 
  22.     } 

從上面的代碼,是RocketMQ將RemotingCommand對象編碼成ByteBuf的唯一入口。

我們可以看到,先將header部分編碼成ByteBuf,然后將body部分追加到ByteBuf里。

body部分編碼很容易理解,那么header部分是怎么編碼的呢?

  1. public ByteBuffer encodeHeader() { 
  2.     return encodeHeader(this.body != null ? this.body.length : 0); 
  3.  
  4. public ByteBuffer encodeHeader(final int bodyLength) { 
  5.     // 1> header length size 
  6.     int length = 4; 
  7.  
  8.     // 2> header data length 
  9.     byte[] headerData; 
  10.     headerData = this.headerEncode(); 
  11.  
  12.     length += headerData.length; 
  13.  
  14.     // 3> body data length 
  15.     length += bodyLength; 
  16.  
  17.     ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); 
  18.  
  19.     // length 
  20.     result.putInt(length); 
  21.  
  22.     // header length 
  23.     result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 
  24.  
  25.     // header data 
  26.     result.put(headerData); 
  27.  
  28.     result.flip(); 
  29.  
  30.     return result; 

我們一步一步看,邏輯如下

  • 定義變量length,其初始值是4,經過幾步操作,其值是4+header.length+body.length。
  • 調用headerEnocde()方法編碼header部分。
  • 創建ByteBuffer并申請4+length-body.length大小的內存,其實就是4+4+header.length大小的內存。
  • 將數據寫入ByteBuffer,完成header部分的編碼。

我們畫一張圖來表示當前內存都存的什么數據。

我們可以很清楚的看到,每部分數據,分別存儲在ButyBuf的什么位置,這里需要特別強調的是,byte[4]部分存儲的什么,通過源碼,進一步分析。

  1. result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 
  2.   
  3. public static byte[] markProtocolType(int source, SerializeType type) { 
  4.     byte[] result = new byte[4]; 
  5.   
  6.     result[0] = type.getCode(); 
  7.     result[1] = (byte) ((source >> 16) & 0xFF); 
  8.     result[2] = (byte) ((source >> 8) & 0xFF); 
  9.     result[3] = (byte) (source & 0xFF); 
  10.     return result; 
  11.   
  12. public enum SerializeType { 
  13.     JSON((byte) 0), 
  14.     ROCKETMQ((byte) 1); 

markProtocolType方法,第一個參數source表示header部分的長度,第二個參數type是編碼類型的枚舉,返回值是byte[]。

那么markProtocolType方法是做什么的呢?我們將type字段傳入JSON和ROCKETMQ這兩個枚舉值,分別看一下,返回的是什么。

返回值共四個字節,只有第一個字節不同,第四個字節是header部分的長度,前文已經提到過,RocketMQ對header部分,可以采用兩種編解碼方式。

對!!沒錯,第一個字節就是標識編解碼類型的。

解碼流程

接下來我們來看解碼,解碼與編碼稍有不同,解碼器繼承Netty提供的LengthFieldBasedFrameDecoder解碼器,我們來看org.apache.rocketmq.remoting.netty.NettyDecoder的源碼。

  1. public class NettyDecoder extends LengthFieldBasedFrameDecoder { 
  2.     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); 
  3.  
  4.     private static final int FRAME_MAX_LENGTH = 
  5.         Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength""16777216")); 
  6.  
  7.     public NettyDecoder() { 
  8.         super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 
  9.     } 
  10.  
  11.     @Override 
  12.     public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
  13.         ByteBuf frame = null
  14.         try { 
  15.             frame = (ByteBuf) super.decode(ctx, in); 
  16.             if (null == frame) { 
  17.                 return null
  18.             } 
  19.  
  20.             ByteBuffer byteBuffer = frame.nioBuffer(); 
  21.  
  22.             return RemotingCommand.decode(byteBuffer); 
  23.         } catch (Exception e) { 
  24.             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); 
  25.             RemotingUtil.closeChannel(ctx.channel()); 
  26.         } finally { 
  27.             if (null != frame) { 
  28.                 frame.release(); 
  29.             } 
  30.         } 
  31.  
  32.         return null
  33.     } 

其步驟如下:

  • 調用LengthFieldBasedFrameDecoder的decode方法,初次解碼。
  • 調用RemotingCommand.decode()方法,完成對header、body部分的解碼,并轉化為RemotingCommand對象。

為什么要經過兩次解碼?

熟悉LengthFieldBasedFrameDecoder解碼器的朋友都知道,LengthFieldBasedFrameDecoder解碼器是Netty提供的一種非常靈活的解碼器。

它在RocketMQ的NettyDecoder類中是這樣被構造的。

  1. public NettyDecoder() { 
  2.     super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 

LengthFieldBasedFrameDecoder我在這里不詳細解釋,按上面的構造方法,意思是跳過開頭前4個字節。

構造出來LengthFieldBasedFrameDecoder后對RocketMQ協議進行初次解碼,解碼結果如下:

我們可以看到,把前四個字節,也就是把存儲length字段的那部分內存截去了,只剩byte[]+header+body部分。

我們再來看RemotingCommand的解碼邏輯

  1. public static RemotingCommand decode(final ByteBuffer byteBuffer) { 
  2.     int length = byteBuffer.limit(); 
  3.     int oriHeaderLen = byteBuffer.getInt(); 
  4.     int headerLength = getHeaderLength(oriHeaderLen); 
  5.  
  6.     byte[] headerData = new byte[headerLength]; 
  7.     byteBuffer.get(headerData); 
  8.  
  9.     RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); 
  10.  
  11.     int bodyLength = length - 4 - headerLength; 
  12.     byte[] bodyData = null
  13.     if (bodyLength > 0) { 
  14.         bodyData = new byte[bodyLength]; 
  15.         byteBuffer.get(bodyData); 
  16.     } 
  17.     cmd.body = bodyData; 
  18.  
  19.     return cmd; 
  20.  
  21. private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { 
  22.     switch (type) { 
  23.         case JSON: 
  24.             RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); 
  25.             resultJson.setSerializeTypeCurrentRPC(type); 
  26.             return resultJson; 
  27.         case ROCKETMQ: 
  28.             RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); 
  29.             resultRMQ.setSerializeTypeCurrentRPC(type); 
  30.             return resultRMQ; 
  31.         default
  32.             break; 
  33.     } 
  34.     return null

只剩byte[4]+header+body三個部分,解碼邏輯便很清晰。可以分為以下幾個步驟

  • 求出數據的長度、header的長度。
  • 根據編碼類型,解碼。比如通過JSON方式編碼,則通過JSON方式解碼。
  • 算出body的長度,解碼body。
  • 將最終解碼生成的RemotingCommand對象,發送給pipeline的下一個handler處理。

 

 

責任編輯:武曉燕 來源: 漫漫技術路
相關推薦

2024-04-25 17:07:33

無源光網絡PON接入網技術

2015-04-13 10:12:08

Windows容器技術Nano Server

2017-11-10 08:35:06

存儲FCoE網絡

2014-05-29 09:34:25

2019-05-13 08:51:53

總監技術CTO

2019-05-06 10:51:49

總監技術場景

2020-04-03 09:05:43

麻將 AI Suphx神經網絡

2023-05-08 07:20:22

Doris分析型數據庫

2013-06-26 09:42:25

技術服務器內存虛擬化

2015-07-27 09:44:38

Amazon EC2云平臺CoreOS容器

2022-04-28 07:59:11

Polkitpkexec漏洞

2018-07-17 09:34:15

Service Mes技術Kubernetes

2009-12-02 11:03:29

AMD

2018-04-20 14:37:43

互聯網技術細節

2021-03-16 15:49:30

架構運維技術

2022-06-29 13:59:40

家居應用鴻蒙

2022-09-05 08:12:28

Google二進制Protobuf

2020-09-21 05:58:40

深度學習算法目標檢測

2016-03-31 15:11:47

2020-12-21 06:58:12

Web安全編解碼工具
點贊
收藏

51CTO技術棧公眾號

欧美三级日韩三级国产三级| 久久精品视频在线看| 欧美xxxx做受欧美| 最新版天堂资源在线| 国产高潮在线| 中文乱码免费一区二区| 亚洲一区二区免费| 国产成人精品一区二三区| 精品国产一区二区三区av片| 日韩一级视频免费观看在线| 国产免费观看高清视频| 高清毛片在线看| 国产伦精品一区二区三区在线观看| 久久久久久久国产精品视频| 国产123在线| 欧美第一在线视频| 一本久久a久久免费精品不卡| 亚洲欧洲精品一区| 亚洲伦理在线观看| 日韩精品一区第一页| 欧美韩日一区二区| 超碰97av在线| 久久中文资源| 日韩午夜激情视频| 午夜免费高清视频| www.色在线| 亚洲免费观看高清在线观看| 日韩欧美精品一区二区| 亚洲a视频在线| 蜜臀国产一区二区三区在线播放| 97在线视频国产| 中文字幕电影av| 伊人久久大香线蕉| 亚洲第一国产精品| 91欧美一区二区三区| 高清电影一区| 疯狂做受xxxx高潮欧美日本| 国产av熟女一区二区三区| aaa日本高清在线播放免费观看| 99麻豆久久久国产精品免费优播| 91最新在线免费观看| 国产精品成人久久久| 麻豆91精品| 亚洲18私人小影院| 麻豆影视在线播放| 亚欧美无遮挡hd高清在线视频| 夜夜嗨av一区二区三区四区| 欧美 日本 国产| 精品人人人人| 日韩欧美国产系列| 中文字幕无码毛片免费看| 粉嫩av国产一区二区三区| 欧美丝袜自拍制服另类| 少妇激情一区二区三区| 欧美特黄aaaaaaaa大片| 欧美日韩性视频在线| 日韩免费视频播放| 麻豆视频在线观看免费网站黄| 亚洲va国产天堂va久久en| 日本免费成人网| 亚洲男同gay网站| 亚洲女同ⅹxx女同tv| 老司机午夜网站| 在线黄色网页| 玉足女爽爽91| 久久久天堂国产精品| 成码无人av片在线观看网站| 国产欧美综合在线观看第十页| 欧美日韩精品免费看| 黄色片在线播放| 国产欧美日韩亚州综合 | 亚洲电影二区| 91.成人天堂一区| 人妻 丝袜美腿 中文字幕| 精品三级av在线导航| 亚洲香蕉在线观看| 日本不卡一二区| 欧美日韩蜜桃| 91po在线观看91精品国产性色| 久久黄色精品视频| 日韩精品一区第一页| 91老司机在线| 视频二区在线观看| 91亚洲精品一区二区乱码| 日本一区免费观看| 黄网站app在线观看| 亚洲国产三级在线| 超碰影院在线观看| 电影中文字幕一区二区| 亚洲国产精久久久久久久| 泷泽萝拉在线播放| 亚洲不卡av不卡一区二区| 欧美激情视频给我| 日本中文字幕在线观看视频| 九九九久久久精品| 国产精品一区二区三区免费观看| 99久久久无码国产精品6| 亚洲欧美久久久久一区二区三区| 国产天堂素人系列在线视频| 亚洲色图欧洲色图| 成人黄色av片| 狂野欧美性猛交xxxx| 精品国产人成亚洲区| www.99热| 一道本一区二区| 成人av番号网| 神马精品久久| 国产精品对白交换视频| 农民人伦一区二区三区| 农村妇女一区二区| 日韩av在线一区二区| 欧美激情精品久久久久久免费| 一区二区三区四区五区精品视频| 国产精品自拍偷拍视频| 午夜在线视频免费| 亚洲精品视频自拍| 免费看污污网站| 老司机aⅴ在线精品导航| 丝袜美腿亚洲一区二区| 亚洲免费在线视频观看| 国产高清在线精品| 亚洲欧美一区二区原创| 在线看的毛片| 精品欧美乱码久久久久久1区2区 | 在线视频精品免费| 懂色av中文一区二区三区| 欧美一区视久久| av蜜臀在线| 日韩三级.com| 色老板免费视频| 蜜桃一区二区三区在线观看| 久久综合久久久| 黄色美女视频在线观看| 3d成人h动漫网站入口| 成人在线一级片| 亚洲精品韩国| 动漫美女被爆操久久久| 91一区二区三区在线| 欧美网站大全在线观看| 亚洲av无码一区二区三区人| 在线一区视频| 国产欧美一区二区三区不卡高清| 在线观看中文| 4438亚洲最大| 国精品无码一区二区三区| 久久电影网站中文字幕| 亚洲国产精品一区二区第一页| 欧美xx视频| 精品亚洲一区二区三区| 伊人手机在线视频| 99天天综合性| 噜噜噜久久亚洲精品国产品麻豆 | 蜜桃狠狠色伊人亚洲综合网站| av在线理伦电影| 精品粉嫩超白一线天av| 久热精品在线观看| 成人福利视频在线看| 国产日本在线播放| 欧美freesex8一10精品| 9.1国产丝袜在线观看| 亚州男人的天堂| 色中色一区二区| 男人的天堂av网| 久久精品国产亚洲a| 二级片在线观看| 亚洲第一二区| 欧美一级片在线播放| 国产午夜精品一区理论片| 欧美日韩一区二区三区四区 | 欧美xxxx中国| 91在线免费视频| 高h视频在线播放| 亚洲精品国产精品国自产观看浪潮| 日本va欧美va国产激情| 久久精品一区二区| 亚洲一区日韩精品| 欧美日韩福利| 久久人人九九| 亚洲我射av| 97视频在线观看网址| 欧美成人免费| 69精品人人人人| 粉嫩aⅴ一区二区三区| 久久夜色精品国产噜噜av| 欧美精品aaaa| 欧美涩涩视频| 欧美成人一区二区在线| 四虎精品永久免费| 欧美精品www在线观看| 麻豆国产在线播放| 555www色欧美视频| 99热只有这里有精品| 国产精品美女视频| 挪威xxxx性hd极品| 麻豆中文一区二区| 久久99久久99精品| 99re6这里只有精品| 91人成网站www| 在线成人av观看| 日韩视频精品在线| 手机亚洲第一页| 日韩欧美一区中文| 日本黄色中文字幕| 亚洲高清免费在线| 青青青视频在线免费观看| 国产suv精品一区二区三区| 亚洲中文字幕久久精品无码喷水| 一区二区电影| 日韩高清国产精品| 亚洲午夜免费| 国产日韩欧美成人| 亚洲综合在线电影| 国内久久久精品| 国产精品剧情一区二区在线观看| 亚洲欧美福利视频| 亚洲经典一区二区三区| 欧美视频一区二区三区| 久久亚洲精品国产| 亚洲一区二区在线视频| ass极品国模人体欣赏| 91在线观看下载| av电影在线播放| 激情综合五月婷婷| 国产三级三级看三级| 性高湖久久久久久久久| 霍思燕三级露全乳照| 欧美精品不卡| 大桥未久一区二区| 日韩精品免费| 日韩一区二区电影在线观看| 麻豆一区二区| 国产亚洲一区在线播放| 久久av网站| 91色精品视频在线| 欧美三级电影网址| 国产精品美女久久久久久免费| 一个人看的www视频在线免费观看| 久久久久久中文| 99久久精品免费观看国产| 日韩在线国产精品| av在线资源网| 在线观看不卡av| 国产一级片在线| 国产午夜精品免费一区二区三区| 天天综合网在线观看| 亚洲精品一区二区三区四区高清| 北条麻妃一二三区| 日韩精品一区二区在线观看| 国产黄a三级三级三级| 日韩视频免费观看高清在线视频| 精品国产无码一区二区| 日韩一级二级三级精品视频| www.成人在线观看| 欧美sm极限捆绑bd| 五月天婷婷在线播放| 日韩不卡在线观看| 免费毛片在线| 亚洲欧洲日本专区| 国产98在线| 久久精品国产一区| 中文字幕伦理免费在线视频| 欧美精品videosex极品1| 福利影院在线看| 欧洲亚洲在线视频| 电影亚洲精品噜噜在线观看| 国产精品电影观看| 亚洲伦理网站| 国产精品麻豆免费版| 日本国产精品| 日本在线成人一区二区| 99久久婷婷| 成人在线播放网址| 久久九九99| 亚洲天堂网2018| 成人免费观看男女羞羞视频| jizz欧美性20| 国产精品国产精品国产专区不片| 国内偷拍精品视频| 精品久久久久久久久久久久久久| 国产又粗又猛又黄视频| 制服丝袜中文字幕亚洲| 亚洲欧美另类日韩| 国产一区二区免费| aaa大片在线观看| 国产91精品黑色丝袜高跟鞋| 成人黄页网站视频| 99re视频在线播放| 国产欧美一区二区三区精品观看 | 在线天堂www在线国语对白| 久久人人爽爽爽人久久久| 糖心vlog免费在线观看| 精品国产鲁一鲁一区二区张丽| 在线视频精品免费| 欧美va亚洲va香蕉在线| www.视频在线.com| 久久久日本电影| 成人亚洲视频| 激情久久av| 亚洲第一偷拍| 日本www高清视频| 高清国产一区二区| 九九热久久免费视频| 精品女同一区二区三区在线播放| 最近中文字幕在线视频| 亚洲高清色综合| 九色porny在线| 国产成人精品优优av| 中文字幕视频精品一区二区三区| 日韩国产精品一区二区| 激情欧美一区二区三区| 成人日韩在线视频| 91亚洲午夜精品久久久久久| 欧美爱爱小视频| 欧美羞羞免费网站| 三级在线电影| 精品自拍视频在线观看| 不卡亚洲精品| 欧洲成人一区二区| 91久久视频| av电影中文字幕| 综合av第一页| 中文天堂在线播放| 亚洲色图偷窥自拍| 国产免费拔擦拔擦8x在线播放| 亚洲一区亚洲二区| 999国产精品999久久久久久| 簧片在线免费看| 久久久久88色偷偷免费| 日韩免费不卡视频| 欧美成人一区二区| а√天堂8资源在线官网| 国产精品稀缺呦系列在线| 国产99亚洲| 日本中文字幕网址| 成人爱爱电影网址| 国产亚洲小视频| 日韩一区二区三区四区| 激情成人四房播| 成人免费观看a| 无码一区二区三区视频| 手机av在线网| 亚洲欧洲av色图| 国产欧美久久久| 少妇久久久久久| 亚洲91在线| 国产女人18毛片| 国产裸体歌舞团一区二区| 精品97人妻无码中文永久在线| 日韩亚洲欧美在线| 七七成人影院| 国产精品久久久对白| 亚洲狼人精品一区二区三区| 呦呦视频在线观看| 亚欧色一区w666天堂| 神马午夜一区二区| 欧美一区二区影院| 国产一区二区精品久| 亚洲一二三区av| 国产精品久久久久久久久久久免费看 | 久久久精品视频国产| 亚洲尤物在线视频观看| 欧美一区二不卡视频| 26uuu另类亚洲欧美日本一| 婷婷成人影院| 香港日本韩国三级网站| 中文字幕亚洲欧美在线不卡| www.色婷婷.com| 97色在线观看免费视频| 视频国产一区| 日韩视频在线观看一区二区三区| 一区二区三区色| 老熟妇高潮一区二区高清视频| 91精品国产高清久久久久久久久 | 人人精品久久| 成人区一区二区| 97se狠狠狠综合亚洲狠狠| 日本中文字幕久久| 色999日韩欧美国产| 亚洲一区二区三区免费| 久久亚洲中文字幕无码| 国产精品网站在线观看| 国产wwwxxx| 欧美一级淫片videoshd| 99国产精品一区二区| 国产激情第一页| 精品视频在线看| 免费在线看电影| 日本电影一区二区三区| 国产一区二区免费在线| 亚洲天堂一区在线| 色妞久久福利网| 久久夜色电影| 亚洲精品mv在线观看| 欧美日韩国产在线| 日韩黄色影院| 久久综合久久综合这里只有精品| 激情综合色综合久久综合| 综合激情网五月| 大胆人体色综合| 国模吧精品视频|