精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

使用Kafka和MongoDB進(jìn)行Go異步處理

開(kāi)發(fā) 后端 其他數(shù)據(jù)庫(kù) Kafka MongoDB
在我前面的博客文章 “我的第一個(gè) Go 微服務(wù):使用 MongoDB 和 Docker 多階段構(gòu)建” 中,我創(chuàng)建了一個(gè) Go 微服務(wù)示例,它發(fā)布一個(gè) REST 式的 http 端點(diǎn),并將從 HTTP POST 中接收到的數(shù)據(jù)保存到 MongoDB 數(shù)據(jù)庫(kù)。

[[240575]]

在我前面的博客文章 “我的***個(gè) Go 微服務(wù):使用 MongoDB 和 Docker 多階段構(gòu)建” 中,我創(chuàng)建了一個(gè) Go 微服務(wù)示例,它發(fā)布一個(gè) REST 式的 http 端點(diǎn),并將從 HTTP POST 中接收到的數(shù)據(jù)保存到 MongoDB 數(shù)據(jù)庫(kù)。

在這個(gè)示例中,我將數(shù)據(jù)的保存和 MongoDB 分離,并創(chuàng)建另一個(gè)微服務(wù)去處理它。我還添加了 Kafka 為消息層服務(wù),這樣微服務(wù)就可以異步處理它自己關(guān)心的東西了。

如果你有時(shí)間去看,我將這個(gè)博客文章的整個(gè)過(guò)程錄制到 這個(gè)視頻中了 :)

下面是這個(gè)使用了兩個(gè)微服務(wù)的簡(jiǎn)單的異步處理示例的上層架構(gòu)圖。

rest-kafka-mongo-microservice-draw-io

rest-kafka-mongo-microservice-draw-io

微服務(wù) 1 —— 是一個(gè) REST 式微服務(wù),它從一個(gè) /POST http 調(diào)用中接收數(shù)據(jù)。接收到請(qǐng)求之后,它從 http 請(qǐng)求中檢索數(shù)據(jù),并將它保存到 Kafka。保存之后,它通過(guò) /POST 發(fā)送相同的數(shù)據(jù)去響應(yīng)調(diào)用者。

微服務(wù) 2 —— 是一個(gè)訂閱了 Kafka 中的一個(gè)主題的微服務(wù),微服務(wù) 1 的數(shù)據(jù)保存在該主題。一旦消息被微服務(wù)消費(fèi)之后,它接著保存數(shù)據(jù)到 MongoDB 中。

在你繼續(xù)之前,我們需要能夠去運(yùn)行這些微服務(wù)的幾件東西:

  1. 下載 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
  2. 安裝 librdkafka —— 不幸的是,這個(gè)庫(kù)應(yīng)該在目標(biāo)系統(tǒng)中
  3. 安裝 Kafka Go 客戶(hù)端
  4. 運(yùn)行 MongoDB。你可以去看我的 以前的文章 中關(guān)于這一塊的內(nèi)容,那篇文章中我使用了一個(gè) MongoDB docker 鏡像。

我們開(kāi)始吧!

首先,啟動(dòng) Kafka,在你運(yùn)行 Kafka 服務(wù)器之前,你需要運(yùn)行 Zookeeper。下面是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接著運(yùn)行 Kafka —— 我使用 9092 端口連接到 Kafka。如果你需要改變端口,只需要在 config/server.properties 中配置即可。如果你像我一樣是個(gè)新手,我建議你現(xiàn)在還是使用默認(rèn)端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起來(lái)之后,我們需要 MongoDB。它很簡(jiǎn)單,只需要使用這個(gè) docker-compose.yml 即可。

  1. version: '3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. - "27017:27017"
  7. volumes:
  8. - "mongodata:/data/db"
  9. networks:
  10. - network1
  11.  
  12. volumes:
  13. mongodata:
  14.  
  15. networks:
  16. network1:

使用 Docker Compose 去運(yùn)行 MongoDB docker 容器。

  1. docker-compose up

這里是微服務(wù) 1 的相關(guān)代碼。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
  2.  
  3. //Retrieve body from http request
  4. b, err := ioutil.ReadAll(r.Body)
  5. defer r.Body.Close()
  6. if err != nil {
  7. panic(err)
  8. }
  9.  
  10. //Save data into Job struct
  11. var _job Job
  12. err = json.Unmarshal(b, &_job)
  13. if err != nil {
  14. http.Error(w, err.Error(), 500)
  15. return
  16. }
  17.  
  18. saveJobToKafka(_job)
  19.  
  20. //Convert job struct into json
  21. jsonString, err := json.Marshal(_job)
  22. if err != nil {
  23. http.Error(w, err.Error(), 500)
  24. return
  25. }
  26.  
  27. //Set content-type http header
  28. w.Header().Set("content-type", "application/json")
  29.  
  30. //Send back data as response
  31. w.Write(jsonString)
  32.  
  33. }
  34.  
  35. func saveJobToKafka(job Job) {
  36.  
  37. fmt.Println("save to kafka")
  38.  
  39. jsonString, err := json.Marshal(job)
  40.  
  41. jobString := string(jsonString)
  42. fmt.Print(jobString)
  43.  
  44. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  45. if err != nil {
  46. panic(err)
  47. }
  48.  
  49. // Produce messages to topic (asynchronously)
  50. topic := "jobs-topic1"
  51. for _, word := range []string{string(jobString)} {
  52. p.Produce(&kafka.Message{
  53. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  54. Value: []byte(word),
  55. }, nil)
  56. }
  57. }

這里是微服務(wù) 2 的代碼。在這個(gè)代碼中最重要的東西是從 Kafka 中消費(fèi)數(shù)據(jù),保存部分我已經(jīng)在前面的博客文章中討論過(guò)了。這里代碼的重點(diǎn)部分是從 Kafka 中消費(fèi)數(shù)據(jù):

kafka-to-mongo/kafka-mongo-sample.go

  1. func main() {
  2.  
  3. //Create MongoDB session
  4. session := initialiseMongo()
  5. mongoStore.session = session
  6.  
  7. receiveFromKafka()
  8.  
  9. }
  10.  
  11. func receiveFromKafka() {
  12.  
  13. fmt.Println("Start receiving from Kafka")
  14. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  15. "bootstrap.servers": "localhost:9092",
  16. "group.id": "group-id-1",
  17. "auto.offset.reset": "earliest",
  18. })
  19.  
  20. if err != nil {
  21. panic(err)
  22. }
  23.  
  24. c.SubscribeTopics([]string{"jobs-topic1"}, nil)
  25.  
  26. for {
  27. msg, err := c.ReadMessage(-1)
  28.  
  29. if err == nil {
  30. fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
  31. job := string(msg.Value)
  32. saveJobToMongo(job)
  33. } else {
  34. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  35. break
  36. }
  37. }
  38.  
  39. c.Close()
  40.  
  41. }
  42.  
  43. func saveJobToMongo(jobString string) {
  44.  
  45. fmt.Println("Save to MongoDB")
  46. col := mongoStore.session.DB(database).C(collection)
  47.  
  48. //Save data into Job struct
  49. var _job Job
  50. b := []byte(jobString)
  51. err := json.Unmarshal(b, &_job)
  52. if err != nil {
  53. panic(err)
  54. }
  55.  
  56. //Insert job into MongoDB
  57. errMongo := col.Insert(_job)
  58. if errMongo != nil {
  59. panic(errMongo)
  60. }
  61.  
  62. fmt.Printf("Saved to MongoDB : %s", jobString)
  63.  
  64. }

我們來(lái)演示一下,運(yùn)行微服務(wù) 1。確保 Kafka 已經(jīng)運(yùn)行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服務(wù) 1 發(fā)送數(shù)據(jù)。

Screenshot-2018-04-29-22.20.33

Screenshot-2018-04-29-22.20.33

這里是日志,你可以在微服務(wù) 1 中看到。當(dāng)你看到這些的時(shí)候,說(shuō)明已經(jīng)接收到了來(lái)自 Postman 發(fā)送的數(shù)據(jù),并且已經(jīng)保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

Screenshot-2018-04-29-22.22.00

因?yàn)槲覀兩形催\(yùn)行微服務(wù) 2,數(shù)據(jù)被微服務(wù) 1 只保存在了 Kafka。我們來(lái)消費(fèi)它并通過(guò)運(yùn)行的微服務(wù) 2 來(lái)將它保存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

現(xiàn)在,你將在微服務(wù) 2 上看到消費(fèi)的數(shù)據(jù),并將它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

Screenshot-2018-04-29-22.24.15

檢查一下數(shù)據(jù)是否保存到了 MongoDB。如果有數(shù)據(jù),我們成功了!

Screenshot-2018-04-29-22.26.39

Screenshot-2018-04-29-22.26.39

完整的源代碼可以在這里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice 

責(zé)任編輯:龐桂玉 來(lái)源: Linux中國(guó)
相關(guān)推薦

2023-10-11 14:37:21

工具開(kāi)發(fā)

2015-12-11 13:39:56

GoiOSAndroid

2021-06-15 15:03:21

MongoDBNode.jsCRUD

2023-11-08 15:04:55

事務(wù)GORM

2023-10-30 23:25:48

FuturesGo語(yǔ)言

2015-06-16 11:06:42

JavaCompletable

2024-02-07 11:44:20

NestJSRxJS異步編程

2021-04-26 05:33:54

Python異步編程

2023-09-27 15:34:48

數(shù)據(jù)編程

2023-11-06 08:01:09

Go同步異步

2023-10-28 16:22:21

Go接口

2021-11-29 22:59:34

Go Dockertest集成

2023-06-15 13:01:07

JavaPythonJavaScript

2012-04-19 10:04:20

ibmdw

2018-09-11 09:41:19

2019-07-02 14:05:23

Go語(yǔ)言高并發(fā)

2022-05-05 08:13:16

Go數(shù)組類(lèi)型

2022-08-12 08:38:52

FFmpegLinux命令

2024-01-15 06:05:05

DockerGol ang應(yīng)用程序

2009-02-27 17:15:05

XMLDOMXPath
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

97精品一区| 日本a人精品| 久久久三级国产网站| 国产精品吊钟奶在线| 91香蕉视频污在线观看| 亚洲伊人影院| 色婷婷久久一区二区三区麻豆| 日韩电影免费观看在线观看| 国内外成人免费激情视频| 91精品国产综合久久久久久豆腐| 国产在线精品不卡| 91国内揄拍国内精品对白| 四虎成人免费影院| 国产精品18hdxxxⅹ在线| 日本福利一区二区| 日韩精品一区二区免费| 国产精品麻豆一区二区三区 | 欧美国产中文字幕| 少妇真人直播免费视频| 久久精品免视看国产成人| 久久久久成人黄色影片| 成人h猎奇视频网站| 日本免费一二三区| 亚洲不卡av不卡一区二区| 日韩精品在线观看一区| 亚洲成人激情小说| 激情小说亚洲| 日韩欧美一区二区三区| 成年丰满熟妇午夜免费视频 | 99国产精品视频免费观看| 国产精品综合网站| 天天操夜夜操视频| 亚洲午夜久久久久久尤物| 日韩中文字幕在线观看| 美女久久久久久久久久| 加勒比色老久久爱综合网| 制服丝袜av成人在线看| 亚洲综合在线网站| 亚洲女色av| 天天影视涩香欲综合网| 97在线免费视频观看| 免费在线观看黄| 国产喷白浆一区二区三区| 久久精品成人一区二区三区蜜臀 | 久久久久无码精品国产| 香港欧美日韩三级黄色一级电影网站| 亚洲欧美综合另类中字| 91丝袜在线观看| 牛牛精品在线视频| 亚洲婷婷在线视频| 一本一道久久久a久久久精品91| 免费一级在线观看| 91免费小视频| 久久av一区二区| 无码精品视频一区二区三区| 亚洲精品男同| 久久久久久久久亚洲| 国内偷拍精品视频| 午夜电影亚洲| 日韩av在线免费播放| 亚洲少妇一区二区三区| 91夜夜蜜桃臀一区二区三区| 精品久久99ma| 色婷婷免费视频| 色天下一区二区三区| 日韩电影中文字幕一区| 大又大又粗又硬又爽少妇毛片| 神马久久av| 亚洲无线码在线一区观看| ass极品国模人体欣赏| 日韩精品一区二区三区免费观影 | 丰腴饱满的极品熟妇| 国产成人黄色| 日韩中文字幕免费| 青娱乐免费在线视频| 欧美在线导航| 日韩极品精品视频免费观看| 五级黄高潮片90分钟视频| 亚洲日产av中文字幕| 亚洲丝袜在线视频| 国产白丝一区二区三区| 欧美福利在线| 91精品国产高清久久久久久久久| 毛片视频网站在线观看| 奇米影视7777精品一区二区| 91日韩在线播放| 日韩永久免费视频| 国产欧美综合色| 91免费视频黄| 国产一二在线观看| 亚洲色欲色欲www在线观看| 8x8ⅹ国产精品一区二区二区| 蜜桃av.网站在线观看| 中文字幕一区二区三中文字幕| 中文字幕欧美日韩一区二区| 国产乱码在线| 91福利国产成人精品照片| 色天使在线观看| 中文字幕av一区二区三区四区| 亚洲精品久久久久久久久久久久久| 国产高清一区二区三区四区| 久久综合五月婷婷| 在线看欧美日韩| 国产精品无码专区| 成人毛片免费看| 欧美人与物videos| 超碰在线97观看| 粉嫩蜜臀av国产精品网站| 日本一区二区三区视频在线播放 | 韩国精品一区二区| 青草热久免费精品视频| 国产剧情久久久| 久久免费美女视频| 大胆欧美熟妇xx| 国产激情欧美| 在线观看日韩一区| 午夜福利三级理论电影| 日韩精品看片| 秋霞成人午夜鲁丝一区二区三区| 国产不卡精品视频| 国产女主播一区| 黄色动漫在线免费看| 日韩欧美久久| 日韩欧美高清在线| 国产精品理论在线| 久久国产成人| 国产视频在线观看一区| 国产一二三区在线观看| 欧美在线色视频| 亚洲最大成人网站| 精品96久久久久久中文字幕无| 国产区精品视频| 国产精品乱码久久久| 久久久另类综合| 日本人体一区二区| 91成人入口| 色综合久久久久久中文网| 亚洲中文字幕在线一区| 国产女人18毛片水真多成人如厕 | 911国产精品| 第一次破处视频| 免费亚洲视频| 久久伊人资源站| 狠狠操一区二区三区| 欧美www视频| 久久久久久福利| 国产成人av一区二区三区在线 | 国产精品久久不卡| 精品福利电影| 粉嫩av一区二区三区免费观看| 国产高清一区二区三区视频| 欧美精品久久天天躁| 手机在线中文字幕| 精品在线一区二区三区| 99porn视频在线| 国产盗摄在线观看| 欧美一区二区久久| 劲爆欧美第一页| 成人综合婷婷国产精品久久| 日韩精品在线观看av| 国产人妖ts一区二区| 性欧美长视频免费观看不卡| 色欲av永久无码精品无码蜜桃| 久久精品人人做人人爽97| 免费在线激情视频| 欧美日韩国产在线观看网站 | 日韩中文字幕精品视频| 国产精品无码免费播放| 一区二区三区四区不卡在线 | 中文字幕欧美激情| 国产成人亚洲综合无码| 精品中文视频| 国外视频精品毛片| 欧美69xxxxx| 精品视频在线免费| 久久久久久久久久久久久女过产乱| 欧美天天视频| 精品久久久久久乱码天堂| xx欧美视频| 综合久久五月天| 六月丁香激情综合| 国产欧美1区2区3区| 午夜天堂在线视频| 亚洲久色影视| 亚洲欧美日韩精品久久久| 精品视频一区二区三区| 97视频免费看| 无遮挡的视频在线观看| 日韩精品一区二区三区四区视频 | 欧美三级视频在线观看| 农村黄色一级片| 久久嫩草精品久久久精品一| 午夜精品免费看| 亚洲日产国产精品| 先锋影音网一区| 澳门久久精品| 热门国产精品亚洲第一区在线| 自拍视频在线| 日韩av在线资源| 97在线公开视频| 欧美视频专区一二在线观看| 久久爱一区二区| 99精品一区二区三区| 亚洲欧美日韩精品一区| 一区二区高清| 国产日韩第一页| 亚洲另类春色校园小说| 97超碰最新| 国产原创一区| 欧美亚洲伦理www| 18+视频在线观看| 中文字幕精品久久| 日本又骚又刺激的视频在线观看| 91麻豆精品国产91久久久更新时间| 国产精品自拍99| 亚洲愉拍自拍另类高清精品| 人妻无码一区二区三区免费| 91视频www| 国产吃瓜黑料一区二区| 久久99蜜桃精品| 成人亚洲视频在线观看| 精品二区久久| 国产免费一区二区视频| 91tv官网精品成人亚洲| 亚洲精品在线视频观看| 亚洲精品亚洲人成在线观看| 国产一区精品视频| 亚洲日本va午夜在线电影| 成人精品在线观看| 欧美爱爱视频| 国产精品视频1区| xx欧美视频| 日本成熟性欧美| 久久电影tv| 欧美制服第一页| 亚洲天堂免费电影| 97在线日本国产| 999福利在线视频| 欧美不卡视频一区| 国产成人精品一区二三区四区五区| 精品视频1区2区| 亚洲综合免费视频| 欧美日韩亚洲丝袜制服| 波多野结衣视频观看| 色婷婷av一区二区三区之一色屋| 国产午夜免费福利| 狠狠操狠狠色综合网| 国产成人综合欧美精品久久| 天天操天天色综合| 国产高潮久久久| 日韩欧美国产激情| 日韩精品一区二区亚洲av观看| 狠狠做深爱婷婷久久综合一区 | 日韩香蕉视频| 婷婷五月综合缴情在线视频| 亚洲国产导航| 国产a级一级片| 美女精品在线| 亚洲xxxx2d动漫1| 蜜桃一区二区三区四区| 午夜免费福利视频在线观看| 精品在线一区二区| 国产欧美视频一区| 91农村精品一区二区在线| 亚洲第一成人网站| 国产精品沙发午睡系列990531| 欧美国产日韩另类 | 免费不卡av网站| 成人免费av资源| 爱爱的免费视频| 中文乱码免费一区二区 | 亚洲成人自拍偷拍| 在线观看免费av片| 欧美日韩三级视频| www.av黄色| 国产视频精品xxxx| 中文日本在线观看| 欧美激情xxxxx| 在线看片国产福利你懂的| 国产精品久久婷婷六月丁香| 国产色99精品9i| 久久本道综合色狠狠五月| 欧洲杯什么时候开赛| 欧美大片免费播放| 在线亚洲一区| 国产成人美女视频| 不卡在线视频中文字幕| 日韩一级片在线免费观看| 亚洲综合色噜噜狠狠| 中文字幕免费观看| 91精品国产麻豆国产自产在线| 亚洲黄色小说网| 亚洲性夜色噜噜噜7777| 欧美草逼视频| 国产成人亚洲综合| 91在线一区| 一区二区在线中文字幕电影视频 | 国产精品视频一二| 久久久久亚洲av成人片| 欧洲视频一区二区| 男人天堂网在线视频| 中文字幕欧美精品日韩中文字幕| 欧美四级在线| 国产欧美韩国高清| 香蕉视频一区| 精品人妻大屁股白浆无码| 日韩专区在线视频| 野战少妇38p| 中文字幕制服丝袜一区二区三区 | 国产亚洲色婷婷久久99精品91| 中文在线一区二区| 国产精品免费精品一区| 91免费精品| 伦理中文字幕亚洲| 中文不卡1区2区3区| 亚洲xxx自由成熟| 欧美久久精品一级c片| 福利视频一区二区三区四区| 免费在线视频一区| 色婷婷免费视频| 一区二区三区精品在线| 中文字幕观看在线| 国产视频精品自拍| av人人综合网| 91传媒视频免费| 91日韩免费| 免费看国产黄色片| 久久嫩草精品久久久精品一| 日韩欧美国产亚洲| 久久久www免费人成精品| 欧美成人精品一区二区免费看片 | 亚洲第一视频网站| 超碰超碰在线| 国产精品爽爽爽爽爽爽在线观看| 天堂成人娱乐在线视频免费播放网站| 青青草视频在线视频| 国产在线一区二区| 登山的目的在线| 欧美日韩一区视频| 岛国大片在线观看| 国产成人精品综合| 亚洲宅男一区| 久久国产成人精品国产成人亚洲| 成人少妇影院yyyy| 国产一级免费av| 欧美成人精品福利| 亚洲91av| av日韩免费电影| 国产精品v亚洲精品v日韩精品| 韩国一区二区在线播放| 国产成人自拍高清视频在线免费播放| 精品伦精品一区二区三区视频密桃| 日本高清免费不卡视频| 大乳在线免费观看| 国产欧美精品va在线观看| 99热在线成人| 婷婷中文字幕在线观看| 亚洲欧美成人一区二区三区| 国产三级在线观看视频| 欧美精品中文字幕一区| 91国内精品白嫩初高生| 黄色大片在线免费看| 91麻豆国产精品久久| 久久久久久无码精品大片| 中国人与牲禽动交精品| 亚洲图片小说区| av一区二区三区免费观看| 99久久精品免费看国产免费软件| 国产又黄又爽又色| 中文字幕亚洲二区| 免费精品一区二区三区在线观看| 日韩在线视频在线| 97se亚洲国产综合在线| 波多野结衣大片| 久久精品国产久精国产思思| 国产伦子伦对白在线播放观看| 精品蜜桃传媒| 日韩av一级电影| 色欲一区二区三区精品a片| 日韩欧美黄色影院| 蜜桃视频www网站在线观看| 青青成人在线| 韩国三级中文字幕hd久久精品| 亚洲精品在线观看av| 亚洲视频专区在线| 精品中文视频| 中文字幕乱码人妻综合二区三区| 国产精品三级av在线播放| 精品国产va久久久久久久| 欧美一区二区三区免费视| 99久久久久国产精品| 亚洲一区二区在线免费| 在线看不卡av| 丝袜在线视频| 青青草原成人| 国产91精品欧美| 这里只有精品6| 992tv在线成人免费观看| 91一区二区三区四区| 欧美成人三级伦在线观看|