精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

用 Go 實現(xiàn)一個支持任務(wù)下發(fā)的后臺服務(wù):Gin + Machinery v2 完整實戰(zhàn)

開發(fā) 后端
本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。

在日常開發(fā)當(dāng)中,我們經(jīng)常希望通過消息通信機(jī)制來異步執(zhí)行任務(wù),例如發(fā)郵件、生成報表、風(fēng)控計算等。這種場景中,使用“任務(wù)隊列”框架來解耦主業(yè)務(wù)流程是一種最佳實踐。

本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。廢話不多說,開始今天的內(nèi)容吧,let's Go!!!

核心目標(biāo)

項目開始前我們先設(shè)定一個小目標(biāo),具體項如下所示:

  • 使用 Gin 提供一個 HTTP 接口,用于接收任務(wù)參數(shù)
  • 使用 Machinery v2 執(zhí)行后臺任務(wù)(通過 Redis 通信)
  • 使用消息隊列解耦 API 層與實際執(zhí)行邏輯

整體流程

用戶請求 -> Gin Server -> Redis Machinery -> Worker

用戶通過HTTP請求提交任務(wù)參數(shù),Gin服務(wù)將任務(wù)發(fā)送到Machinery任務(wù)隊列中,后續(xù)由Worker異步消費任務(wù)并執(zhí)行。

準(zhǔn)備工作

(1) 安裝 Redis

本地環(huán)境可以直接通過 Docker 運行:

docker run -d --name redis -p 6379:6379 redis

(2) 初始化 Go 項目

go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2

(3) 創(chuàng)建對應(yīng)目錄和代碼文件

?  machinery-gin tree .
.
├── cmd
│   ├── api
│   │   └── main.go
│   └── worker
│       └── main.go
├── config
│   └── config.go
├── controller
│   └── task_controller.go
├── go.mod
├── go.sum
├── router
│   └── router.go
├── scheduler
│   └── manager.go
├── service
│   └── task_service.go
└── tasks
    ├── handler.go
    └── registry.go


10 directories, 11 files

核心模塊代碼實現(xiàn)

(1) config/config.go

package config


import "github.com/RichardKnop/machinery/v2/config"


func GetMachineryConfig() *config.Config {
	return &config.Config{
		Broker:        "redis://localhost:6379",
		DefaultQueue:  "machinery_tasks",
		ResultBackend: "redis://localhost:6379",
	}
}

(2) tasks/handler.go

package tasks


import (
	"fmt"
	"time"
)


func PrintMessage(msg string) error {
	fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
	return nil
}

(3) tasks/registry.go

package tasks


import "github.com/RichardKnop/machinery/v2"


func RegisterTasks(server *machinery.Server) error {
	return server.RegisterTasks(map[string]interface{}{
		"print_message": PrintMessage,
	})
}

(4) scheduler/manager.go

package scheduler


import (
	"sync"
	"time"


	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/tasks"
)


type ScheduledTask struct {
	Name     string
	Interval time.Duration
	Msg      string
	Paused   bool
	StopChan chan struct{}
}


var (
	tasksMap = make(map[string]*ScheduledTask)
	mu       sync.Mutex
)


func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
	mu.Lock()
	defer mu.Unlock()


	if _, exists := tasksMap[name]; exists {
		return
	}


	t := &ScheduledTask{
		Name:     name,
		Interval: interval,
		Msg:      msg,
		StopChan: make(chan struct{}),
	}
	tasksMap[name] = t


	go func(task *ScheduledTask) {
		ticker := time.NewTicker(task.Interval)
		defer ticker.Stop()


		for {
			select {
			case <-ticker.C:
				if !task.Paused {
					signature := &tasks.Signature{
						Name: "print_message",
						Args: []tasks.Arg{
							{Type: "string", Value: task.Msg},
						},
					}
					server.SendTask(signature)
				}
			case <-task.StopChan:
				return
			}
		}
	}(t)
}


func PauseTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = true
	}
}


func ResumeTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = false
	}
}


func StopTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		close(task.StopChan)
		delete(tasksMap, name)
	}
}

(5) service/task_service.go

package service


import (
	"time"


	"github.com/RichardKnop/machinery/v2"
	"machinery-gin/scheduler"
)


func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
	scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}


func PauseScheduledTask(name string) {
	scheduler.PauseTask(name)
}


func ResumeScheduledTask(name string) {
	scheduler.ResumeTask(name)
}


func StopScheduledTask(name string) {
	scheduler.StopTask(name)
}

(6) controller/task_controller.go

package controller


import (
	"net/http"


	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/service"
)


func TaskHandler(server *machinery.Server) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req struct {
			Name     string `json:"name"`
			Interval int    `json:"interval"`
			Msg      string `json:"msg"`
		}


		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}


		service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
		c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
	}
}


func PauseHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.PauseScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "paused"})
	}
}


func ResumeHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.ResumeScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "resumed"})
	}
}


func StopHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.StopScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "stopped"})
	}
}

(7) router/router.go

package routes


import (
	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/controller"
)


func SetupRouter(server *machinery.Server) *gin.Engine {
	r := gin.Default()


	r.POST("/task/start", controller.TaskHandler(server))
	r.POST("/task/pause", controller.PauseHandler())
	r.POST("/task/resume", controller.ResumeHandler())
	r.POST("/task/stop", controller.StopHandler())


	return r
}

(8) cmd/api/main.go (啟動Gin API服務(wù))

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/router"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()


	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)


	_ = tasks.RegisterTasks(machineryServer)
	r := routes.SetupRouter(machineryServer)
	r.Run(":9311")
}

(9) cmd/worker/main.go (啟動Worker消費者)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()
	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)
	_ = tasks.RegisterTasks(machineryServer)


	worker := machineryServer.NewWorker("worker_name", 10)
	_ = worker.Launch()
}

測試程序

啟動 API 服務(wù)和 Worker:

go run cmd/api/main.go
go run cmd/worker/main.go

測試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
  "name": "hello-task",
  "interval": 5,
  "msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%                                                       
 ?  ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
?  ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%                                                                 
?  ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%

測試結(jié)果如下所示:

總結(jié)

我們已經(jīng)實現(xiàn)了任務(wù)的“下發(fā)與執(zhí)行”,“暫停/恢復(fù)”后續(xù)可以進(jìn)一步擴(kuò)展:

  • 支持任務(wù)列表,任務(wù)詳情
  • 支持“周期定時任務(wù)(調(diào)度器)”
  • 支持任務(wù)執(zhí)行狀態(tài)查詢/UI管理面板
責(zé)任編輯:趙寧寧 來源: 馬嘍編程筆記
相關(guān)推薦

2025-09-15 08:49:44

GoJSONAPI

2024-01-08 08:36:29

HTTPGo代理服務(wù)器

2022-05-22 13:55:30

Go 語言

2025-05-20 09:39:57

GogRPC微服務(wù)

2024-01-02 13:58:04

GoREST API語言

2024-05-10 08:47:22

標(biāo)準(zhǔn)庫v2Go

2024-03-15 15:20:10

并發(fā)服務(wù)IP

2025-03-06 08:54:24

泛型類型MapGo1

2010-08-06 14:07:21

RIP V2

2010-08-05 17:00:04

RIP V2協(xié)議

2014-04-14 15:54:00

print()Web服務(wù)器

2022-03-06 19:57:50

狀態(tài)機(jī)easyfsm項目

2023-05-10 08:05:41

GoWeb應(yīng)用

2012-04-24 18:10:56

華為E5

2017-05-08 15:00:20

H5代碼服務(wù)器

2021-09-27 09:55:06

Chrome瀏覽器Manifest V2

2020-07-03 10:21:48

Go框架Docker

2021-08-23 15:14:09

Linuxat命令任務(wù)

2023-02-26 01:37:57

goORM代碼

2023-03-01 09:39:40

調(diào)度系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號

日韩精品123| 日韩视频中文字幕在线观看| 18aaaa精品欧美大片h| 粉嫩aⅴ一区二区三区四区五区| 九九热精品视频| aaaaa黄色片| 欧美草逼视频| 91视频在线看| 国产精品夜色7777狼人| 午夜黄色福利视频| 国产精品视频一区视频二区| 亚洲无人区一区| 国产一区免费| 久久久国产免费| 日韩精品欧美大片| 欧美三级视频在线观看 | 成人在线视频区| 亚洲精选视频在线| 精品国产乱码久久久久久丨区2区 精品国产乱码久久久久久蜜柚 | 永久免费看mv网站入口78| 亚洲不卡系列| 亚洲精品免费看| 久久久久久欧美精品色一二三四| 日韩不卡高清视频| 91精品精品| 日韩二区三区在线| 成年人免费在线播放| www在线免费观看| 国产精品资源站在线| 欧美成人精品一区二区| 91嫩草在线| 久久影院一区二区| 九九久久婷婷| 日韩一区二区三区av| 男人揉女人奶房视频60分| 成人精品一区二区三区免费| 国产成人久久精品77777最新版本| 97在线视频免费播放| 亚洲黄色网址大全| 老司机成人在线| 911国产精品| 成年人观看网站| av电影免费在线观看| 91欧美激情一区二区三区成人| 国产精品一香蕉国产线看观看| 久久久久无码国产精品| 日韩大片在线| 日韩不卡在线观看| 极品粉嫩美女露脸啪啪| 中文在线最新版地址| 亚洲精品ww久久久久久p站| 欧美裸体网站| 国产人妖一区二区三区| 久久一本综合频道| 91国产视频在线| 欧美成人一区二区三区高清| 日本一区二区免费高清| 亚洲免费视频观看| 在线看黄色的网站| 欧美精品三级在线| 精品视频1区2区3区| 成人综合视频在线| 2019中文字幕在线电影免费| 一区二区三区久久久| 伊人久久青草| 亚洲视频tv| 国产午夜精品久久| 欧美激情导航| 欧洲成人av| 久久影院视频免费| 99国产超薄肉色丝袜交足的后果| 国产精品熟女久久久久久 | 黄色一级大片免费| 国产美女av在线| 国产精品福利一区二区三区| 亚洲成人18| 川上优的av在线一区二区| 91色porny蝌蚪| 久久精品五月婷婷| 天堂资源最新在线| 97久久精品人人爽人人爽蜜臀| 国产伦精品一区二区| 国产77777| 丁香亚洲综合激情啪啪综合| 7777奇米亚洲综合久久| 亚洲综合五月天婷婷丁香| 日本不卡免费在线视频| 国产精品激情av在线播放| 五月婷婷六月婷婷| 日本sm残虐另类| 91精品免费久久久久久久久| 99国产精品久久久久久久成人| 蜜桃视频一区二区| 成人www视频在线观看| 国产女人18毛片水真多| 国产精品一区二区91| 国产精品国色综合久久| 少妇高潮久久久| 久久久久久久综合日本| 亚洲黄色成人久久久| 麻豆视频在线| 一区二区三区不卡视频在线观看| 日韩视频免费播放| 国产天堂在线播放视频| 欧美日韩在线影院| 欧美日韩在线成人| 年轻的保姆91精品| 亚洲精品久久视频| 中文字幕av久久爽一区| 亚洲精品tv久久久久久久久久| 久久69精品久久久久久国产越南| 好吊操这里只有精品| 丝袜亚洲精品中文字幕一区| 国产美女久久精品香蕉69| 丁香六月天婷婷| 国产亚洲欧美中文| 国产人妻互换一区二区| 爱情岛亚洲播放路线| 色综合久久中文字幕| 超碰在线资源站| 久久综合五月婷婷| 国产网站欧美日韩免费精品在线观看| 中字幕一区二区三区乱码| 亚洲情侣在线| 26uuu另类亚洲欧美日本老年| 中文字幕第三页| 风流少妇一区二区| 亚洲精品白虎| 黄色在线看片| 欧美无人高清视频在线观看| 久久久高清视频| 久久人人99| 欧洲永久精品大片ww免费漫画| 一级特黄aaa大片| 久久综合久久99| 蜜桃网站在线观看| 电影亚洲精品噜噜在线观看| 日韩一区二区三区高清免费看看| 久久国产柳州莫菁门| 亚洲视频日本| 国产精品综合不卡av| 色婷婷av一区二区三| 国产精品福利一区二区| 蜜臀av午夜一区二区三区| 日本一区影院| www日韩欧美| 欧美黄色一级大片| 不卡视频一二三四| 久久99国产精品一区| 97精品国产99久久久久久免费| 7777精品伊人久久久大香线蕉完整版 | 清纯唯美亚洲经典中文字幕| 日韩天堂在线视频| 国产一区二区三区四区视频| 中文字幕制服丝袜一区二区三区| 最新中文字幕免费视频| 日韩一区二区三区免费播放| 国产精品一区二区久久精品| 在线观看av黄网站永久| 欧美日韩一区二区电影| 国精产品一区一区| 精品在线观看视频| 浴室偷拍美女洗澡456在线| 电影91久久久| 欧美激情在线有限公司| 日本激情一区二区| 日韩欧美亚洲范冰冰与中字| 免费看黄色的视频| 美腿丝袜在线亚洲一区| 国产手机视频在线观看| 亚洲国产精品免费视频| 午夜精品福利电影| 激情在线视频| 欧美日高清视频| 久久久91视频| 92国产精品观看| 最新天堂中文在线| 亚洲欧美亚洲| 蜜桃导航-精品导航| 成人在线高清| 久久全国免费视频| 国产资源在线看| 在线播放91灌醉迷j高跟美女| 麻豆changesxxx国产| 91网站视频在线观看| 男人插女人下面免费视频| 亚洲精品tv久久久久久久久久| 国产日韩在线一区二区三区| 香蕉久久免费电影| 两个人的视频www国产精品| 性xxxx搡xxxxx搡欧美| 欧美视频一区二区三区在线观看| 欧美国产日韩综合| 久久蜜桃香蕉精品一区二区三区| 中文字幕在线观看日 | 九九热精品在线| 午夜福利理论片在线观看| 欧美日韩午夜精品| 国产五月天婷婷| 国产精品久久毛片av大全日韩| 超级砰砰砰97免费观看最新一期| 一区二区三区四区五区在线| 一区二区三区偷拍| 日韩中出av| 亚洲xxx大片| 欧美日韩免费观看视频| 欧美激情喷水视频| 最新电影电视剧在线观看免费观看| 精品国产麻豆免费人成网站| 夜夜躁很很躁日日躁麻豆| 欧美日韩国产精品专区 | 成人美女免费网站视频| 自拍在线观看| 欧美日韩成人黄色| h视频在线播放| 日韩av影视在线| 精品国自产在线观看| 欧美三区在线观看| 中文字幕第15页| 亚洲午夜三级在线| 四虎影院中文字幕| 中文字幕av一区二区三区| 色婷婷精品久久二区二区密| 国产九色精品成人porny| 欧美自拍小视频| 亚洲一区亚洲| 18禁裸男晨勃露j毛免费观看| 999久久久国产精品| 欧美一区二区视频在线| 欧美成人基地| 国内一区二区在线视频观看| 欧美午夜在线播放| 国产一区视频在线播放| 欧美日韩精品一区二区三区视频| 91精品国产99久久久久久| 久久av色综合| 久久99亚洲精品| 超碰在线最新| 久久成人精品一区二区三区| 日本成a人片在线观看| 一二美女精品欧洲| 黄色大片在线看| 亚洲色图国产精品| 欧美日韩国产综合视频| 日韩国产精品视频| 日韩a在线观看| 精品在线欧美视频| 青青草免费在线视频| 国产视频精品久久久| 台湾av在线二三区观看| 亚洲精品福利在线观看| 香蕉视频免费看| 亚洲欧美日本另类| 国产在线高清| 在线看日韩av| 免费黄色在线看| 操日韩av在线电影| 日本动漫理论片在线观看网站| 草民午夜欧美限制a级福利片| 亚洲综合伊人久久大杳蕉| 欧美精品免费播放| 国产精品—色呦呦| 91精品国产乱码久久久久久久久| 伊人久久av| 国产成人在线精品| 国产精品天堂蜜av在线播放| 成人午夜在线观看| 中文字幕区一区二区三| 精品久久久三级| 欧洲grand老妇人| 在线国产精品网| 国产精品久久| 99精品人妻少妇一区二区| 玖玖在线精品| 成人不卡免费视频| 懂色av一区二区夜夜嗨| 人体私拍套图hdxxxx| 国产色产综合产在线视频| fc2ppv在线播放| 亚洲大片精品永久免费| 青青青国产在线 | 精品一区在线| 一区二区免费电影| 国产一区久久| 日韩毛片在线免费看| 久久se这里有精品| 黄色免费视频网站| 国产女主播一区| 久久人妻无码aⅴ毛片a片app| 亚洲一区二区三区四区在线观看 | heyzo一区| 国产精品对白刺激| 欧洲一区在线| 视频一区国产精品| 黑人一区二区| 天天综合网日韩| av一区二区久久| 欧美一级片在线视频| 岛国精品视频在线播放| 国产精品无码AV| 亚洲欧美精品suv| 色yeye免费人成网站在线观看| 日本一区二区在线免费播放| 日韩精品免费视频一区二区三区 | 日本黄色大片在线观看| 久久你懂得1024| 久久在线视频精品| 欧美日韩一级片在线观看| 深爱五月激情五月| 日韩网站在线观看| 三级成人在线| 久久精品日韩精品| 欧美日韩专区| 久久久久久久高清| 国产日韩欧美在线一区| 国产乡下妇女做爰毛片| 欧美放荡的少妇| 国产小视频在线播放| 性欧美长视频免费观看不卡| 亚洲在线资源| 四虎影院一区二区三区| 午夜亚洲性色福利视频| 69xxx免费视频| 亚洲乱码中文字幕| 在线观看视频二区| 亚洲区免费影片| 在线看片国产福利你懂的| 国产精品一区视频| 国产综合自拍| 国产男女无遮挡猛进猛出| 中文字幕欧美一区| 波多野结衣在线电影| 亚洲欧美一区二区激情| 蜜桃视频在线观看免费视频| 粉嫩av一区二区三区免费观看 | 久久这里都是精品| 国产污污视频在线观看 | 波多野结衣影院| 亚洲综合成人网| 午夜精品久久久久久久99热黄桃| 久久精品中文字幕| 国产精品国产亚洲精品| 波多野结衣三级在线| 久久91精品国产91久久小草| 久久午夜精品视频| 欧美日韩亚洲高清一区二区| www.中文字幕久久久| 国产精品人成电影| 日韩精品欧美| av在线网址导航| 亚洲欧美激情在线| 精品人妻aV中文字幕乱码色欲 | 亚洲精品一区二区三区樱花 | 亚洲第一综合色| 欧美自拍第一页| 2021国产精品视频| 自拍偷拍欧美一区| 91制片厂毛片| 中文字幕亚洲一区二区av在线 | 亚洲欧美日韩综合| 亚洲日本网址| 在线码字幕一区| 国产乱码精品一区二区三区忘忧草 | 欧美日韩麻豆| 日韩一级片播放| 国产精品成人免费| 国产乱淫av免费| 高清欧美性猛交xxxx| 人人网欧美视频| 精品视频无码一区二区三区| 国产精品乱人伦一区二区| 91精品中文字幕| 性金发美女69hd大尺寸| 国产乱码精品一区二区亚洲 | 99免费在线观看| 亚洲免费视频观看| 成人在线分类| 人人妻人人添人人爽欧美一区| 久久久久国产精品麻豆| 亚洲一区在线观| 国内精品视频久久| 国产精品嫩模av在线| 亚洲精品免费一区亚洲精品免费精品一区| 亚洲人成在线观看一区二区| 秋霞av鲁丝片一区二区| 国产精品久久久久久久久久久不卡| 99久久综合| 国产毛片毛片毛片毛片毛片毛片| 91久久精品一区二区三| 性欧美videoshd高清| 欧美日韩国产不卡在线看| 激情亚洲综合在线| 国产成人精品a视频一区| 在线一区二区日韩| 中文字幕一区二区三区中文字幕 | 国产自产2019最新不卡| 亚洲天堂av片| 欧美不卡视频一区发布| 要久久爱电视剧全集完整观看| 久久久久久久久久毛片|