精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Go語言如何操縱Kafka保證無消息丟失

開發 后端 Kafka
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。

[[423396]]

背景

目前一些互聯網公司會使用消息隊列來做核心業務,因為是核心業務,所以對數據的最后一致性比較敏感,如果中間出現數據丟失,就會引來用戶的投訴,年底績效就變成325了。之前和幾個朋友聊天,他們的公司都在用kafka來做消息隊列,使用kafka到底會不會丟消息呢?如果丟消息了該怎么做好補償措施呢?本文我們就一起來分析一下,并介紹如何使用Go操作Kafka可以不丟失數據。

本文操作kafka基于:https://github.com/Shopify/sarama

初識kafka架構

維基百科對kafka的介紹:

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。此外,Kafka可以通過Kafka Connect連接到外部系統(用于數據輸入/輸出),并提供了Kafka Streams——一個Java]流式處理庫。該設計受事務日志的影響較大。

kafka的整體架構比較簡單,主要由producer、broker、consumer組成:

截屏2021-09-12 上午10.00.13

針對架構圖我們解釋一個各個模塊:

  • Producer:數據的生產者,可以將數據發布到所選擇的topic中。
  • Consumer:數據的消費者,使用Consumer Group進行標識,在topic中的每條記錄都會被分配給訂閱消費組中的一個消費者實例,消費者實例可以分布在多個進程中或者多個機器上。
  • Broker:消息中間件處理節點(服務器),一個節點就是一個broker,一個Kafka集群由一個或多個broker組成。

還有些概念我們也介紹一下:

  • topic:可以理解為一個消息的集合,topic存儲在broker中,一個topic可以有多個partition分區,一個topic可以有多個Producer來push消息,一個topic可以有多個消費者向其pull消息,一個topic可以存在一個或多個broker中。
  • partition:其是topic的子集,不同分區分配在不同的broker上進行水平擴展從而增加kafka并行處理能力,同topic下的不同分區信息是不同的,同一分區信息是有序的;每一個分區都有一個或者多個副本,其中會選舉一個leader,fowller從leader拉取數據更新自己的log(每個分區邏輯上對應一個log文件夾),消費者向leader中pull信息。

kafka丟消息的三個節點

生產者push消息節點

先看一下producer的大概寫入流程:

  • producer先從kafka集群找到該partition的leader
  • producer將消息發送給leader,leader將該消息寫入本地
  • follwers從leader pull消息,寫入本地log后leader發送ack
  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 發送 ack

截屏2021-09-12 上午11.16.43

通過這個流程我們可以看到kafka最終會返回一個ack來確認推送消息結果,這里kafka提供了三種模式:

  1. NoResponse RequiredAcks = 0 
  2. WaitForLocal RequiredAcks = 1 
  3. WaitForAll RequiredAcks = -1 
  • NoResponse RequiredAcks = 0:這個代表的就是數據推出的成功與否都與我無關了
  • WaitForLocal RequiredAcks = 1:當local(leader)確認接收成功后,就可以返回了
  • WaitForAll RequiredAcks = -1:當所有的leader和follower都接收成功時,才會返回

所以根據這三種模式我們就能推斷出生產者在push消息時有一定幾率丟失的,分析如下:

  • 如果我們選擇了模式1,這種模式丟失數據的幾率很大,無法重試
  • 如果我們選擇了模式2,這種模式下只要leader不掛,就可以保證數據不丟失,但是如果leader掛了,follower還沒有同步數據,那么就會有一定幾率造成數據丟失
  • 如果選擇了模式3,這種情況不會造成數據丟失,但是有可能會造成數據重復,假如leader與follower同步數據是網絡出現問題,就有可能造成數據重復的問題。

所以在生產環境中我們可以選擇模式2或者模式3來保證消息的可靠性,具體需要根據業務場景來進行選擇,在乎吞吐量就選擇模式2,不在乎吞吐量,就選擇模式3,要想完全保證數據不丟失就選擇模式3是最可靠的。

kafka集群自身故障造成

kafka集群接收到數據后會將數據進行持久化存儲,最終數據會被寫入到磁盤中,在寫入磁盤這一步也是有可能會造成數據損失的,因為寫入磁盤的時候操作系統會先將數據寫入緩存,操作系統將緩存中數據寫入磁盤的時間是不確定的,所以在這種情況下,如果kafka機器突然宕機了,也會造成數據損失,不過這種概率發生很小,一般公司內部kafka機器都會做備份,這種情況很極端,可以忽略不計。

消費者pull消息節點

push消息時會把數據追加到Partition并且分配一個偏移量,這個偏移量代表當前消費者消費到的位置,通過這個Partition也可以保證消息的順序性,消費者在pull到某個消息后,可以設置自動提交或者手動提交commit,提交commit成功,offset就會發生偏移:

截屏2021-09-12 下午3.37.33

所以自動提交會帶來數據丟失的問題,手動提交會帶來數據重復的問題,分析如下:

  • 在設置自動提交的時候,當我們拉取到一個消息后,此時offset已經提交了,但是我們在處理消費邏輯的時候失敗了,這就會導致數據丟失了
  • 在設置手動提交時,如果我們是在處理完消息后提交commit,那么在commit這一步發生了失敗,就會導致重復消費的問題。

比起數據丟失,重復消費是符合業務預期的,我們可以通過一些冪等性設計來規避這個問題。

實戰

完整代碼已經上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo

解決push消息丟失問題

主要是通過兩點來解決:

  • 通過設置RequiredAcks模式來解決,選用WaitForAll可以保證數據推送成功,不過會影響時延時
  • 引入重試機制,設置重試次數和重試間隔

因此我們寫出如下代碼(摘出創建client部分):

  1. func NewAsyncProducer() sarama.AsyncProducer { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  cfg.Version = version 
  9.  cfg.Producer.RequiredAcks = sarama.WaitForAll // 三種模式任君選擇 
  10.  cfg.Producer.Partitioner = sarama.NewHashPartitioner 
  11.  cfg.Producer.Return.Successes = true 
  12.  cfg.Producer.Return.Errors = true 
  13.  cfg.Producer.Retry.Max = 3 // 設置重試3次 
  14.  cfg.Producer.Retry.Backoff = 100 * time.Millisecond 
  15.  cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg) 
  16.  if err != nil{ 
  17.   log.Fatal("NewAsyncProducer failed", err.Error()) 
  18.   return nil 
  19.  } 
  20.  return cli 

解決pull消息丟失問題

這個解決辦法就比較粗暴了,直接使用自動提交的模式,在每次真正消費完之后在自己手動提交offset,但是會產生重復消費的問題,不過很好解決,使用冪等性操作即可解決。

代碼示例:

  1. func NewConsumerGroup(group string) sarama.ConsumerGroup { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  
  9.  cfg.Version = version 
  10.  cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange 
  11.  cfg.Consumer.Offsets.Initial = sarama.OffsetOldest 
  12.  cfg.Consumer.Offsets.Retry.Max = 3 
  13.  cfg.Consumer.Offsets.AutoCommit.Enable = true // 開啟自動提交,需要手動調用MarkMessage才有效 
  14.  cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 間隔 
  15.  client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg) 
  16.  if err != nil { 
  17.   log.Fatal("NewConsumerGroup failed", err.Error()) 
  18.  } 
  19.  return client 

上面主要是創建ConsumerGroup部分,細心的讀者應該看到了,我們這里使用的是自動提交,說好的使用手動提交呢?這是因為我們這個kafka庫的特性不同,這個自動提交需要與MarkMessage()方法配合使用才會提交(有疑問的朋友可以實踐一下,或者看一下源碼),否則也會提交失敗,因為我們在寫消費邏輯時要這樣寫:

  1. func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 
  2.  for msg := range claim.Messages() { 
  3.   var data common.KafkaMsg 
  4.   if err := json.Unmarshal(msg.Value, &data); err != nil { 
  5.    return errors.New("failed to unmarshal message err is " + err.Error()) 
  6.   } 
  7.   // 操作數據,改用打印 
  8.   log.Print("consumerClaim data is "
  9.  
  10.   // 處理消息成功后標記為處理, 然后會自動提交 
  11.   session.MarkMessage(msg,""
  12.  } 
  13.  return nil 

或者直接使用手動提交方法來解決,只需兩步:

第一步:關閉自動提交:

  1. consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自動提交,改為手動 

第二步:消費邏輯中添加如下代碼,手動提交模式下,也需要先進行標記,在進行commit

  1. session.MarkMessage(msg,""
  2. session.Commit() 

完整代碼可以到github上下載并進行驗證!

總結

本文我們主要說明了兩個知識點:

Kafka會產生消息丟失

使用Go操作Kafka如何配置可以不丟失數據

 

日常業務開發中,很多公司都喜歡拿消息隊列進行解耦,那么你就要注意了,使用Kafka做消息隊列無法保證數據不丟失,需要我們自己手動配置補償,別忘記了,要不又是一場P0事故。

 

責任編輯:武曉燕 來源: Golang夢工廠
相關推薦

2024-06-18 08:26:22

2024-08-06 09:55:25

2021-08-04 07:47:18

Kafka消息框架

2021-03-08 10:19:59

MQ消息磁盤

2021-10-22 08:37:13

消息不丟失rocketmq消息隊列

2022-08-26 05:24:04

中間件技術Kafka

2019-03-13 09:27:57

宕機Kafka數據

2025-07-21 09:02:45

2023-11-27 17:29:43

Kafka全局順序性

2024-11-11 07:05:00

Redis哨兵模式主從復制

2024-02-26 08:10:00

Redis數據數據庫

2025-11-11 09:05:09

2022-03-31 08:26:44

RocketMQ消息排查

2023-09-13 08:14:57

RocketMQ次數機制

2025-01-06 00:00:01

KratosGo微服務

2024-01-16 08:24:59

消息隊列KafkaRocketMQ

2022-07-11 08:01:55

Kafka服務器宕機

2024-05-09 08:04:23

RabbitMQ消息可靠性

2020-10-14 08:36:10

RabbitMQ消息

2024-12-18 07:43:49

點贊
收藏

51CTO技術棧公眾號

这里只有精品在线| 国产私拍福利精品视频二区| 成人黄色av电影| 97av在线播放| 林心如三级全黄裸体| 国产美女精品视频免费播放软件| 一区二区三区四区在线| 精品伦理一区二区三区| 国产精品欧美综合| 欧美韩国一区| 国产午夜精品麻豆| 亚洲色图欧美自拍| 英国三级经典在线观看| 1024精品合集| 狼狼综合久久久久综合网| 中文字幕永久在线观看| 精品av久久久久电影| 亚洲欧美一区二区三区情侣bbw| 三级a在线观看| 激情网站在线| 国产欧美日韩激情| 成人精品水蜜桃| 久久久久久av无码免费看大片| 欧美色123| 中文字幕亚洲综合| 日本一卡二卡在线| 国产一区二区三区免费观看在线 | 欧美三级在线免费观看| 欧美禁忌电影| 亚洲国产成人91精品| 亚洲黄色av片| 视频一区在线免费看| 亚洲午夜久久久久中文字幕久| 亚洲成人网上| 日本亚洲欧美| 97se亚洲国产综合在线| 666精品在线| 亚洲图片中文字幕| 久久精品女人| 38少妇精品导航| 久久久久亚洲AV| 亚洲精品一二三区区别| 中文字幕亚洲综合久久筱田步美| 人人妻人人澡人人爽人人精品 | 在线观看91精品国产麻豆| 日韩免费毛片视频| 日本不卡网站| 欧美日韩国产专区| 日本国产在线播放| av在线不卡免费| 亚洲高清不卡在线观看| 国产情侣第一页| 欧美人与性动交α欧美精品济南到| 国产精品不卡一区| 性欧美大战久久久久久久免费观看| 五月天福利视频| 成人av电影在线观看| 福利视频久久| 欧美一区二区三区成人片在线| 国产精品一区二区x88av| 国产精品极品尤物在线观看 | 成人无码av片在线观看| 国产精品一区2区3区| 亚洲欧美国产精品va在线观看| 少妇按摩一区二区三区| 亚洲都市激情| 亚洲性夜色噜噜噜7777| 91l九色lporny| 欧美日韩一区二区综合| 久久精品夜夜夜夜夜久久| 国精品人伦一区二区三区蜜桃| 日韩成人激情| 久久色在线播放| 久久久久亚洲av片无码下载蜜桃| 亚洲区一区二| 琪琪亚洲精品午夜在线| 激情网站在线观看| 久久国产夜色精品鲁鲁99| 成人日韩在线电影| 成人午夜免费福利| 91色视频在线| 亚洲精品一区二区三区樱花 | 成人欧美一区二区三区| 免费在线精品视频| 欧美xxxx做受欧美88bbw| 亚洲成人一区在线| 老司机午夜av| 国产成人精品一区二区三区视频| 欧美另类一区二区三区| 99久久综合网| 亚洲人成网亚洲欧洲无码| 一区二区三区日韩在线| 欧美人妻精品一区二区三区| 99国产精品视频免费观看一公开 | 99久久久成人国产精品| 亚洲精品一区二区三区精华液 | av观看在线免费| 白白色 亚洲乱淫| 性欧美.com| 波多野结衣精品| 欧美日韩国产精品成人| 不许穿内裤随时挨c调教h苏绵| 综合国产视频| 美日韩丰满少妇在线观看| 日韩黄色精品视频| 久热成人在线视频| 精品免费一区二区三区蜜桃| 一本一道波多野毛片中文在线| 亚洲午夜久久久久| 91日韩视频在线观看| 99久热这里只有精品视频免费观看| 日韩久久精品成人| 久草资源在线视频| 日本特黄久久久高潮| 国产精品日韩一区二区免费视频| 99精品老司机免费视频| 欧美性猛交xxxx黑人| 久久黄色一级视频| 免费看av成人| 97色在线视频| 国内精品久久久久久久久久久| 国产肉丝袜一区二区| 欧美日韩一道本| 亚洲精品一区二区三区中文字幕 | 黄色激情小视频| 在线播放亚洲| 97se视频在线观看| 欧美天天影院| 欧美中文字幕亚洲一区二区va在线| 成人欧美精品一区二区| 亚洲女同另类| 国产日韩av在线| 国产高清一级毛片在线不卡| 欧美日韩国产麻豆| 无码成人精品区在线观看| 亚洲女同另类| 亚洲综合日韩中文字幕v在线| 超碰国产在线观看| 色婷婷久久久久swag精品| xxxxxx黄色| 99国产精品99久久久久久粉嫩| 国产精品国模大尺度私拍| 69xxx在线| 91.com视频| 亚洲一区电影在线观看| 蜜桃久久精品一区二区| 欧洲高清一区二区| 久久野战av| 亚洲日本成人女熟在线观看| 亚洲综合图片网| 久久在线观看免费| 播放灌醉水嫩大学生国内精品| 国语一区二区三区| 高清在线视频日韩欧美| 免费观看黄色一级视频| 亚洲国产毛片aaaaa无费看 | 国产精品少妇自拍| 天堂在线资源视频| 色男人天堂综合再现| 国产精品网站视频| 毛片在线看网站| 在线播放中文字幕一区| 一区二区三区四区五区| 国产麻豆91精品| www.亚洲成人网| 欧美亚洲国产日韩| 国产v综合v亚洲欧美久久| h视频在线免费| 欧美剧情片在线观看| 波多野结衣在线网址| 国产a久久麻豆| 国产青青在线视频| 精品欧美激情在线观看| 国产一区玩具在线观看| 在线中文免费视频| 亚洲黄色av女优在线观看| 国产午夜精品久久久久| 国产精品嫩草影院com| 在线免费观看av网| 亚洲精品精选| 日韩av图片| 国产一区二区三区| 4p变态网欧美系列| 午夜激情在线观看| 欧美不卡一区二区三区| 在线精品免费视| 1024精品合集| 黄色污在线观看| 麻豆视频观看网址久久| 欧美中日韩在线| 国产伦精品一区二区三区视频| 国产原创欧美精品| 九色porny丨首页入口在线| 国产午夜精品全部视频播放| 精品毛片一区二区三区| 欧美日韩一区二区三区| 99久久久免费精品| av不卡免费电影| 日韩中文字幕a| 99亚洲精品| 伊人情人网综合| 秋霞在线一区| 91在线观看免费高清| 最新欧美色图| 欧美精品少妇videofree| 头脑特工队2免费完整版在线观看| 欧美片网站yy| 日本韩国欧美中文字幕| 亚洲精品亚洲人成人网| 国产jjizz一区二区三区视频| 国产精品一区二区在线观看网站| 黄色片久久久久| 伊人成人在线| 国产av第一区| 欧美日韩在线播放视频| 精品日本一区二区三区| 九九99久久精品在免费线bt| 国产成人亚洲综合| 极品在线视频| 欧美精品精品精品精品免费| 91.xxx.高清在线| 亚洲欧美一区二区三区久久| 免费国产黄色片| 精品少妇一区二区三区在线视频 | 捆绑紧缚一区二区三区视频| 国产亚洲精品网站| 一区在线观看| 奇米777四色影视在线看| 手机亚洲手机国产手机日韩| 欧美精品一区在线发布| 玖玖玖免费嫩草在线影院一区| 2014国产精品| 亚洲网站三级| 国产原创欧美精品| 亚洲精品aa| 成人美女免费网站视频| 国产成人亚洲一区二区三区 | 久久久久久久久久久久电影| 国产精品美乳一区二区免费| 自由日本语热亚洲人| 91产国在线观看动作片喷水| 国产欧洲在线| 91精品国产色综合久久不卡98口| xxxx视频在线| 国外成人在线直播| 国产高清自产拍av在线| 97在线精品国自产拍中文| 成人福利电影| 欧美精品第一页在线播放| 手机电影在线观看| 欧美国产中文字幕| 蜜桃传媒在线观看免费进入 | 日韩精品一区二区三| 亚洲超碰精品一区二区| 日韩高清精品免费观看| 精品久久久久久久大神国产| 欧美日韩综合在线观看| 精品日韩视频在线观看| jizz国产在线观看| 欧美少妇bbb| 亚洲综合精品在线| 3atv在线一区二区三区| 国产黄a三级三级看三级| 欧美mv日韩mv国产网站| 外国精品视频在线观看| 亚洲一区第一页| 日本免费在线观看| 欧美日韩国产成人| 岛国在线视频网站| 国产精品极品尤物在线观看| 99亚洲男女激情在线观看| 97视频中文字幕| 青青视频一区二区| 性高潮久久久久久久久| 欧美~级网站不卡| 少妇无码av无码专区在线观看 | 网站黄在线观看| 亚洲系列中文字幕| 黄色在线免费网站| 97免费在线视频| 国产香蕉久久| 国产精品成人观看视频免费| 免费成人高清在线视频theav| 亚洲精品日韩精品| 亚洲激精日韩激精欧美精品| 99久久久无码国产精品6| 寂寞少妇一区二区三区| 久久久高清视频| 欧美极品美女视频| 久久高清无码视频| 91极品视觉盛宴| www.色婷婷.com| 亚洲午夜久久久影院| 尤物视频在线看| 国产99久久精品一区二区 夜夜躁日日躁| 男人天堂久久| 九色91国产| 午夜精品一区二区三区国产 | 欧美aaaaa性bbbbb小妇| 国产中文日韩欧美| 日韩欧美黄色| 日韩中文在线字幕| 日本欧美大码aⅴ在线播放| 波多野吉衣在线视频| 国产日韩欧美麻豆| 日本少妇毛茸茸高潮| 欧美美女一区二区三区| 亚洲av成人精品日韩在线播放| 中文字幕欧美日韩va免费视频| 国内老司机av在线| 国产美女搞久久| 欧美激情在线免费| 蜜臀av无码一区二区三区| 激情综合色综合久久| 伊人网在线视频观看| 亚洲国产成人tv| 国产又爽又黄免费软件| 亚洲天堂日韩电影| 国产黄大片在线观看| 99c视频在线| 久久久久久久久久久妇女 | 婷婷综合六月| 狠狠色综合一区二区| 91精品精品| 182午夜在线观看| 国产午夜精品理论片a级大结局| 国产无码精品视频| 日韩一级免费一区| 日本福利在线| 国产精品电影网站| 日韩三级视频| 久激情内射婷内射蜜桃| 国产精品中文字幕一区二区三区| 国产18无套直看片| 91黄色免费网站| 噜噜噜在线观看播放视频| 992tv成人免费视频| 国产女人18毛片水真多18精品| 亚洲一区 在线播放| 精品一区二区三区免费播放| 国产三级在线观看完整版| 色婷婷av一区二区三区之一色屋| av女名字大全列表| 97香蕉超级碰碰久久免费的优势| japanese色系久久精品| 国产精品久久久久久久久电影网| 国产精品一卡二卡| 欧美日韩中文字幕在线观看| 欧美一区国产二区| caopen在线视频| 99国产超薄肉色丝袜交足的后果| 一区二区日韩欧美| 91人妻一区二区三区| 亚洲一区二区三区精品在线| 国产综合在线播放| 97avcom| 日韩丝袜视频| 手机看片福利日韩| 成人免费视频在线观看| 精品国产av 无码一区二区三区| 久久高清视频免费| 综合中文字幕| 黄色一级片播放| 久久久国产精华| 五月天中文字幕| 久久精品国产综合| 欧美高清影院| 四虎精品欧美一区二区免费| 大美女一区二区三区| 国产欧美日韩另类| 伊人久久久久久久久久| 99热这里有精品| 国产欧美日韩网站| 国产视频一区不卡| 97超碰人人模人人人爽人人爱| 久久成人精品视频| 国产伦精品一区二区三区在线播放 | 国产在线电影| 亚洲a成v人在线观看| 亚洲三级色网| 美国黄色特级片| 日韩欧美另类在线| 中国字幕a在线看韩国电影| 亚洲精品无人区| 不卡的av在线播放| 中文字幕视频免费观看| 欧美俄罗斯乱妇| 自拍亚洲一区| 人妻巨大乳一二三区| 欧美色视频日本版| 日本在线观看| 精品乱色一区二区中文字幕| 捆绑调教一区二区三区| 在线观看国产亚洲| 久久精品中文字幕电影| 日本亚洲不卡| 四川一级毛毛片| 欧美三区在线观看| 欧美aa在线| 300部国产真实乱|