精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

如何基于 Go 語言設計一個簡潔優雅的分布式任務系統

開發 后端
本文帶大家從技術選型到架構設計再到代碼實現,一步步完成了一個簡潔優雅的分布式任務系統。

在當今云計算與微服務盛行的時代,分布式任務系統已成為支撐大規模業務的核心基礎設施。今天就來為大家分享下如何基于 Go 語言從零設計和實現一個架構簡潔且擴展性強的分布式任務系統。

一、前置概念

本文會設計并實現一個分布式任務系統,這里我們要先明確兩個概念。

  • 分布式:在我們將要實現的分布式任務系統中,分布式是指我們的服務可以部署多個副本,這樣才能確保服務更加穩定。
  • 任務:這里的任務是指異步任務,可能是定時或需要周期性運行的任務。

有了這兩個前置概念,我們再來分析下在 Go 中如何實現分布式和如何處理異步任務。

二、異步任務

在 Go 中,要處理異步任務有多種方式,比如原生支持的 time.Sleep、time.Timer 或 time.Ticke,再比如一些第三方包 go-co-op/gocron/v2、robfig/cron/v3 或 bamzi/jobrunner 等。本項目在調研過后決定采用 robfig/cron/v3 包(以下簡稱 cron)來處理異步任務,原因如下:

  • cron 是一個非常流行的包,支持標準 crontab 表達式(并且可精確到秒),支持時區、任務鏈等高級功能。
  • 提供秒級精度的任務調度。
  • 輕量級,且能輕松應對各種復雜的定時任務場景。

對于 cron 包的使用,可以參考我的另一篇文章「在 Go 中使用 cron 執行定時任務」,里面有詳細說明。

三、分布式

既然我們的任務系統是分布式的,那么必然要考慮并發安全問題。當多個副本同時讀寫系統資源時,很容易產生競態問題。在分布式場景中,解決競態問題最常用的手段當然是分布式鎖。

Go 中的分布式鎖解決方案也很多,常見的有基于 etcd、Redis、ZooKeeper 等中間件來實現的,因為 Redis 在系統中更加常用,所以本項目采用基于 Redis 實現分布式鎖的解決方案。Go 中有兩個比較常用的第三方包 bsm/redislock 和 go-redsync/redsync 都是基于 Redis 的分布式鎖實現。本項目在調研過后決定采用 go-redsync/redsync 包(以下簡稱 redsync),原因如下:

  • redsync 遵循 Redis 官方推薦的 Redlock 算法,支持多節點,容忍部分節點故障,避免單點問題。
  • 通過多數派機制確保鎖的全局唯一性,降低鎖沖突風險。
  • redsync 是 Redis 官方 唯一推薦的 Go Redis 分布式鎖解決方案,由 Redis 社區背書,長期維護,可靠性高。

對于 redsync 包的使用,可以參考我的另一篇文章「在 Go 中如何使用分布式鎖解決并發問題?」,里面有詳細說明。

四、分布式任務系統

現在我們對分布式任務系統中的分布式和任務都有了明確的認識,并且找到了解決方案。那么接下來就可以設計并實現分布式任務系統了。

1. 功能介紹

我們要實現的分布式任務系統叫 nightwatch,nightwatch 是守夜、值班的意思,那么這套系統的功能也就一目了然了,就是用來 24 小時不停機的執行異步任務的。

nightwatch 要實現的主要功能如下:

現在我們有一個系統,用戶可以在 Web 頁面通過表單提交一個“任務”到關系型數據庫的任務表中。然后 nightwatch 系統會定時的掃描任務表,取出待執行的任務,并根據任務中的配置,到 Kubernetes 中拉起 Job 資源對象,真正的執行任務。此外,nightwatch 還會取出已經開始執行的任務,然后去 Kubernetes 中獲取當前任務對應的 Job 實時狀態,并回寫到數據庫中。直到 Kubernetes 中的 Job 執行完成(或失敗),nightwatch 會標記 Job 在數據庫表中的任務狀態為完成(或失敗)。當任務狀態為完成(或失敗),則任務任務終止,nightwatch 不再掃描出這種狀態的數據。

系統整體架構如下:

nightwatch-component

nightwatch 是系統中一個非常核心的組件,用來控制任務的執行,并同步任務狀態。

2. 架構設計

現在我們知道了 nightwatch 的作用,那么就可以設計其實現架構了。

nightwatch 架構設計如下:

nightwatch-architecture

首先,我們需要思考一個問題,分布式鎖應該在何時使用?

在分布式任務系統中,我們有兩種方式使用分布式鎖來保證并發安全。一種是在執行具體的定時任務時,多個副本之間進行競爭,誰搶到鎖,誰就可以執行任務,未搶到鎖的副本可以選擇性的跳過此次執行周期。另一種是在 nigthwatch 啟動時,就開始搶鎖,多個副本之間誰搶到鎖,誰就去執行任務調度,未搶到鎖的副本則進行周期性的嘗試搶鎖操作,如果當前執行任務調度的副本被終止,那么其他副本就有機會搶到鎖,并執行任務調度。

這兩種方式個各自有不同的使用場景,第一種方式的優勢是能夠實現多副本之間的負載均衡,多個副本都在工作,都有可能搶到鎖并執行任務,不過這種方式不能嚴格控制執行任務的間隔時間,比較適合對間隔時間要求不嚴格的任務。第二種方式實際上只有一個副本在執行任務調度,其他副本是空載狀態,是主備設計,這種方式的好處是能夠嚴格控制任務執行的間隔時間。

nigthwatch 采用第二種方式來使用分布式鎖保證并發安全。所以在 nigthwatch 的架構設計中,在啟動 nigthwatch 時,先將所有的定時任務注冊到任務調度器中,接著就會進行搶鎖操作,只有搶到鎖的副本才能夠執行任務調度。未搶到鎖,則使用一個循環周期性的嘗試搶鎖,直到搶鎖成功。對于搶到鎖的副本,當注冊的任務定時策略達到時,任務調度器就會執行任務。架構圖中的 task 就是我們要實現的異步任務,也是主要業務邏輯,task 組件會從數據庫表中讀取任務,然后在 Kubernetes 中啟動 Job,并同步數據庫和 Kubernetes 資源之間的狀態。

3. 目錄結構

我們現在已經設計好了 nigthwatch 的架構,可以動手進行開發實現了。

以下是 nigthwatch 項目的目錄和文件:

$ tree nightwatch
nightwatch # 項目目錄
├── README.md # README 文件
├── assets # 項目相關的資源目錄
│   ├── docker-compose.yaml # 用于啟動項目依賴的 MariaDB 和 Redis
│   └── schema.sql # 測試數據 SQL
├── cmd # 項目啟動入口
│   └── main.go
├── go.mod
├── go.sum
├── internal # 項目內部包
│   ├── logger.go # 定制日志
│   ├── nightwatch.go # nightwatch 的實現和啟動入口
│   └── watcher # 任務接口和實現
│       ├── all # 任務注冊入口
│       │   └── all.go
│       ├── config.go # 任務配置
│       ├── task # 任務實現,一個可以定時同步 MariaDB 和 Kubernetes 任務狀態的示例程序
│       │   ├── task.go
│       │   └── watcher.go
│       └── watcher.go # 任務接口
└── pkg # 項目公共包
    ├── db # 數據庫實例
    │   ├── mysql.go
    │   └── redis.go
    ├── meta # 元信息
    │   └── where.go # MariaDB where 查詢條件元信息封裝
    ├── model # 任務模型
    │   └── task.go
    ├── store # 數據庫操作接口
    │   ├── helper.go
    │   ├── store.go
    │   └── task.go
    └── util # 工具包
        └── reflect
            └── reflect.go

14 directories, 21 files

這里主要的目錄和文件我都標明了其用途,不必完全記住,你先有個印象,大概知道整個項目的結構。

4. 調用鏈路

為了便于你理解代碼,我畫了一張 nigthwatch 項目的調用鏈路圖:

nightwatch-flow

這個調用鏈路圖指明了 nigthwatch 項目中所有目錄之間的代碼調用關系。根據這張圖,可以看出這是一個非常簡潔的架構。cmd 中的入口函數main會調用internal中的nigthwatch包,nigthwatch是分布式系統實現的關鍵所在,這里實現了任務的注冊和調度,watcher定義了任務的接口,task就是任務的具體實現,task的業務邏輯中會依賴store層來讀寫數據庫,所以store會依賴model和db。

5. 代碼實現

接下來就進入到真正的編碼階段了。

首先我們需要為 nigthwatch 項目的業務設計一張任務表,建表 SQL 語句如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/assets/schema.sql

CREATE TABLE IF NOTEXISTS `task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) NOT NULLDEFAULT'' COMMENT '任務名稱',
  `namespace` varchar(45) NOT NULLDEFAULT'' COMMENT 'k8s namespace 名稱',
  `info` TEXT NOT NULL COMMENT '任務 k8s 相關信息',
  `status` varchar(45) NOT NULLDEFAULT'' COMMENT '任務狀態',
  `user_id` bigint(20) NOT NULLDEFAULT'0' COMMENT '用戶 ID',
  `created_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMP,
  `updated_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name_namespace` (`name`, `namespace`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任務表';

為了簡化你理解項目的成本,這里僅定義了最小需要字段。

同時我們可以插入兩條測試數據,用來后續項目功能的驗證:

INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (1, 'demo-task-1', 'default', '{"image":"alpine","command":["sleep"],"args":["60"]}', 'Normal', 1);
INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (2, 'demo-task-2', 'demo', '{"image":"busybox","command":["sleep"],"args":["3600"]}', 'Normal', 2);

拿 ID為1的task數據舉例,任務名是demo-task-1,namespace是default,鏡像是alpine,執行命令是sleep,命令參數是60,狀態為Normal表示待執行。當 nigthwatch 服務掃描到這條數據時,就會在 Kubernetes 中default這個namespace下創建一個name為demo-task-1的 Job,其啟動鏡像為alpine,啟動命令為sleep 60,即睡眠60 秒然后退出。

現在有了數據庫表和測試數據,我們來看看 nigthwatch 代碼是如何實現的。

入口文件 cmd/main.go 實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/cmd/main.go

package main

import (
    "flag"
    "log/slog"
    "path/filepath"
    "time"

    genericapiserver "k8s.io/apiserver/pkg/server"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"

    "github.com/jianghushinian/blog-go-example/nightwatch/internal"
    "github.com/jianghushinian/blog-go-example/nightwatch/pkg/db"
)

func main() {
    slog.SetLogLoggerLevel(slog.LevelDebug)

    var kubecfg *string
    if home := homedir.HomeDir(); home != "" {
        kubecfg = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "Optional absolute path to kubeconfig")
    } else {
        kubecfg = flag.String("kubeconfig", "", "Absolute path to kubeconfig")
    }

    config, err := clientcmd.BuildConfigFromFlags("", *kubecfg)
    if err != nil {
        slog.Error(err.Error())
        return
    }
    config.QPS = 50
    config.Burst = 100
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        slog.Error(err.Error())
        return
    }

    cfg := nightwatch.Config{
        MySQLOptions: &db.MySQLOptions{
            Host:                  "127.0.0.1:33306",
            Username:              "root",
            Password:              "nightwatch",
            Database:              "nightwatch",
            MaxIdleConnections:    100,
            MaxOpenConnections:    100,
            MaxConnectionLifeTime: time.Duration(10) * time.Second,
        },
        RedisOptions: &db.RedisOptions{
            Addr:         "127.0.0.1:36379",
            Username:     "",
            Password:     "nightwatch",
            Database:     0,
            MaxRetries:   3,
            MinIdleConns: 0,
            DialTimeout:  5 * time.Second,
            ReadTimeout:  3 * time.Second,
            WriteTimeout: 3 * time.Second,
            PoolSize:     10,
        },
        Clientset: clientset,
    }

    nw, err := cfg.New()
    if err != nil {
        slog.Error(err.Error())
        return
    }

    stopCh := genericapiserver.SetupSignalHandler()
    nw.Run(stopCh)
}

因為 main.go 非常重要,是整個程序的入口,所以我就把完整代碼都貼出來了,包括 import 部分,這是為了讓你對項目文件之間的依賴關系有一個更清晰的認知,后續講解的其他模塊我就只會貼出核心代碼。

main 函數的核心功能如下:

首先會初始化各種依賴包,初始化 Kubernetes clientset 用于后續操作 Job,初始化 MySQL 用于從中讀取任務和更新任務狀態,初始化 Redis 用于實現分布式鎖。接著會使用這些初始化的對象創建一個配置對象 nightwatch.Config。然后使用 cfg.New() 創建一個 nightwatch 實例對象 nw。最后調用 nw.Run(stopCh) 啟動服務。這里為了做優雅退出,還引用了 Kubernetes genericapiserver 優雅退出機制。

這里涉及到的 Kubernetes clientset、MySQL 和 Redis 相關的具體配置細節我就不詳細講解了,咱們還是將主要精力聚焦在 nigthwatch 的主脈絡上。

接下來看下 cfg.New() 代碼實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// New 通過配置構造一個 nightWatch 對象
func (c *Config) New() (*nightWatch, error) {
    rdb, err := db.NewRedis(c.RedisOptions)
    if err != nil {
        slog.Error(err.Error(), "Failed to create Redis client")
        returnnil, err
    }

    logger := newCronLogger()
    runner := cron.New(
        cron.WithSeconds(),
        cron.WithLogger(logger),
        cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
    )

    pool := goredis.NewPool(rdb)
    lockOpts := []redsync.Option{
        redsync.WithRetryDelay(50 * time.Microsecond),
        redsync.WithTries(3),
        redsync.WithExpiry(defaultExpiration),
    }
    locker := redsync.New(pool).NewMutex(lockName, lockOpts...)

    cfg, err := c.CreateWatcherConfig()
    if err != nil {
        returnnil, err
    }

    nw := &nightWatch{runner: runner, locker: locker, config: cfg}
    if err := nw.addWatchers(); err != nil {
        returnnil, err
    }

    return nw, nil
}

*Config.New 方法會通過配置信息構造一個 nightWatch 對象并返回。這里的 runner 就是異步任務調度器,使用 cron 包實現,用來調度和執行定時任務。并且這個方法內部還實例化了一個 redsync 分布式鎖對象 locker。nightWatch 對象就是通過 runner、locker 和 cfg 來構造的。

這里的核心部分是 addWatchers 的邏輯,其實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// 注冊所有 Watcher 實例到 nightWatch
func (nw *nightWatch) addWatchers() error {
    for n, w := range watcher.ListWatchers() {
        if err := w.Init(context.Background(), nw.config); err != nil {
            slog.Error(err.Error(), "Failed to construct watcher", "watcher", n)
            return err
        }

        spec := watcher.Every3Seconds
        if obj, ok := w.(watcher.ISpec); ok {
            spec = obj.Spec()
        }

        if _, err := nw.runner.AddJob(spec, w); err != nil {
            slog.Error(err.Error(), "Failed to add job to the cron", "watcher", n)
            return err
        }
    }

    returnnil
}

*nightWatch.addWatchers 方法用來注冊所有 Watcher 對象到調度器 runner 中。

Watcher 是一個接口,定義了異步任務應該實現的方法。Watcher 接口定義如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/watcher.go

type Watcher interface {
    Init(ctx context.Context, config *Config) error
    cron.Job
}

type ISpec interface {
    Spec() string
}

var (
    registryLock = new(sync.Mutex)
    registry     = make(map[string]Watcher)
)

func Register(watcher Watcher) {
    registryLock.Lock()
    defer registryLock.Unlock()

    name := reflectutil.StructName(watcher)
    if _, ok := registry[name]; ok {
        panic("duplicate watcher entry: " + name)
    }

    registry[name] = watcher
}

func ListWatchers()map[string]Watcher {
    registryLock.Lock()
    defer registryLock.Unlock()

    return registry
}

可以看到,要實現一個異步任務,需要實現 Init 方法以及 cron.Job 接口。cron.Job 接口其實只有一個方法定義如下:

type Job interface {
    Run()
}

只要滿足 Watcher 接口的任務,就可以通過 Register 函數注冊到 registry 中。ListWatchers 函數則可以返回注冊到 registry 中全部任務。而 ListWatchers 函數正是在前文講解的 *nightWatch.addWatchers 方法中調用的。

到目前為止,任務如何被注冊到 nightWatch.runner 的過程我們就串起來了。接下來需要關注的兩個點是,調度器 runner 是何時啟動的,以及是何時調用 Register 函數注冊任務的。

我們先來看調度器 runner 是何時啟動的:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// Run 執行異步任務,此方法會阻塞直到關閉 stopCh
func (nw *nightWatch) Run(stopCh <-chanstruct{}) {
    ctx := wait.ContextForChannel(stopCh)

    // 循環加鎖,直到加鎖成功,再去啟動任務
    ticker := time.NewTicker(defaultExpiration + (5 * time.Second))
    defer ticker.Stop()
    for {
        err := nw.locker.LockContext(ctx)
        if err == nil {
            slog.Debug("Successfully acquired lock", "lockName", lockName)
            break
        }
        slog.Debug("Failed to acquire lock", "lockName", lockName, "err", err)
        <-ticker.C
    }

    // 看門狗,實現鎖自動續約
    ticker = time.NewTicker(extendExpiration)
    defer ticker.Stop()
    gofunc() {
        for {
            select {
            case <-ticker.C:
                if ok, err := nw.locker.ExtendContext(ctx); !ok || err != nil {
                    slog.Debug("Failed to extend lock", "err", err, "status", ok)
                }
            case <-ctx.Done():
                slog.Debug("Exiting lock watchdog")
                return
            }
        }
    }()

    // 啟動定時任務
    nw.runner.Start()
    slog.Info("Successfully started nightwatch server")

    // 阻塞等待退出信號
    <-stopCh

    nw.stop()
}

在 *nightWatch.Run 方法中,首先會啟動一個無限循環,定時執行嘗試搶鎖操作,直到搶鎖成功。這與前文中講解的 nightwatch 架構設計是一致的。搶到鎖后,就可以執行 nw.runner.Start() 啟動調度器,執行定時任務了。

此外,在 nightwatch 架構圖中沒有體現的一點是,這里為分布式鎖實現了看門狗機制,用來自動續約。關于 redsync 分布式鎖的自動續約,在我的文章「在 Go 中如何使用分布式鎖解決并發問題?」中有詳細講解。

而這個 Run 方法,就是在 main 函數中通過 nw.Run(stopCh) 調用的。

我們還剩下一個最后要看的核心邏輯是 task 在何時會調用 Register 注冊到 registry 變量中。

還記得前文中講解的 Watcher 接口么,*taskWatcher 實現了這個接口:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

var _ watcher.Watcher = (*taskWatcher)(nil)

type taskWatcher struct {
    store     store.IStore
    clientset kubernetes.Interface

    wg sync.WaitGroup
}

func (w *taskWatcher) Init(ctx context.Context, config *watcher.Config) error {
    w.store = config.Store
    w.clientset = config.Clientset
    returnnil
}

func (w *taskWatcher) Spec() string {
    return"@every 30s"
}

func init() {
    watcher.Register(&taskWatcher{})
}

taskWatcher 就是 task 任務的具體對象,它實現了 watcher.Watcher 接口。可以發現,Register 函數是在 init 函數中調用的,即 task 包被導入時實現自動注冊。

task 包會在 nightwatch/internal/watcher/all/all.go 文件被導入:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/all/all.go

package all

import (
    // 觸發所有 Watcher 的 init 函數進行注冊
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/task"
)

這里以匿名包的方式導入 task 包。如果我們還有其他的任務實現,則同樣可以參考 task 包的注冊方式,在這里以匿名包形式導入,這也是 all 包名的由來,可以注冊全部的任務。

然后會在 nightwatch 中再次以匿名包的方式導入 all 包:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

package nightwatch

import (
    ...
    // 觸發 init 函數
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/all"
)

我們可以總結出任務的注冊流程是,nightwatch 包導入 all 包,all 包會導入 task 包,task 包的 init 函數執行就會完成注冊。所以在入口文件 main.go 導入 nightwatch 包的時候,就會觸發任務的注冊。在調用 nw.Run(stopCh) 啟動服務時,所有的任務已經注冊完成了。

taskWatcher 對象的核心邏輯當然就是 Run 方法了:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

// Run 運行 task watcher 任務
func (w *taskWatcher) Run() {
    w.wg.Add(2)

    slog.Debug("Current sync period is start")

    // NOTE: 將 Normal 狀態任務在 Kubernetes 中啟動
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilter(map[string]any{
            "status": model.TaskStatusNormal,
        }))
        if err != nil {
            slog.Error(err.Error(), "Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Create(ctx, toJob(task), metav1.CreateOptions{})
                if err != nil {
                    slog.Error(err.Error(), "Failed to create job")
                    return
                }

                task.Status = model.TaskStatusPending
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error(), "Failed to update task status")
                    return
                }
                slog.Info("Successfully created job", "namespace", job.Namespace, "name", job.Name)
            }(task)
        }
        wg.Wait()
    }()

    // NOTE: 同步中間狀態的任務在 Kubernetes 中的狀態到表中
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilterNot(map[string]any{
            // 排除這幾個狀態
            "status": []model.TaskStatus{model.TaskStatusNormal, model.TaskStatusSucceeded, model.TaskStatusFailed},
        }))
        if err != nil {
            slog.Error(err.Error(), "Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Get(ctx, task.Name, metav1.GetOptions{})
                if err != nil {
                    slog.Error(err.Error(), "Failed to get task")
                    return
                }

                task.Status = toTaskStatus(job)
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error(), "Failed to update task status")
                    return
                }
                slog.Info("Successfully sync job status to task", "namespace", job.Namespace, "name", job.Name, "status", task.Status)
            }(task)
        }
        wg.Wait()
    }()

    w.wg.Wait()
    slog.Debug("Current sync period is complete")
}

Run 方法就是用來實現每個 watcher 對象的業務邏輯。比如這里就實現了 task 任務的業務邏輯,它包含兩個功能,在 Run 方法的上半部分代碼中啟動了第一個 goroutine 用來實現將 Normal 狀態任務在 Kubernetes 中啟動,下半部分代碼中啟動了第二個 goroutine 用來實現同步已運行的任務在 Kubernetes 中的 Job 狀態到數據庫表中。

至此,nightwatch 項目就講解完成了。我們一起實現了一個架構簡潔且擴展性強的分布式任務系統。關于 nightwatch 項目中更多的代碼細節你可以跳轉到我的 GitHub 倉庫中查看。

五、總結

本文帶大家從技術選型到架構設計再到代碼實現,一步步完成了一個簡潔優雅的分布式任務系統。這套系統不僅架構簡潔,擴展也非常方便,我們只需要按照 task 的套路實現更多的異步任務,都可以非常方便的方式注冊到 nightwatch 中。

責任編輯:趙寧寧 來源: 令飛編程
相關推薦

2016-09-30 10:13:07

分布式爬蟲系統

2024-09-23 04:00:00

java架構分布式系統

2021-11-01 12:25:56

Redis分布式

2023-03-06 08:14:48

MySQLRedis場景

2018-09-06 22:49:31

分布式架構服務器

2024-08-07 08:15:47

2022-08-01 08:01:04

ID發號器系統

2019-07-19 15:51:11

框架選型分布式

2013-09-11 16:02:00

Spark分布式計算系統

2023-09-04 08:45:07

分布式配置中心Zookeeper

2019-01-28 11:46:53

架構運維技術

2023-10-08 10:49:16

搜索系統分布式系統

2018-05-09 09:44:51

Java分布式系統

2019-12-27 16:00:56

分布式事務框架Java

2022-11-06 19:28:02

分布式鎖etcd云原生

2024-05-07 09:00:41

Go語言令牌桶

2024-07-15 08:25:07

2024-10-29 14:32:45

Golang分布式系統

2020-07-30 09:35:09

Redis分布式鎖數據庫

2022-12-29 08:32:50

xxl-job緩存Schedule
點贊
收藏

51CTO技術棧公眾號

日韩中文字幕电影| 日本天堂免费a| 一级黄色大片免费观看| 久久久久久久久久久妇女| 欧美大片拔萝卜| 亚洲午夜精品久久久久久人妖| 精品视频一二三| 免费亚洲电影在线| 欧美大片免费观看| 亚洲成人网在线播放| 成人污污视频| 狠狠躁天天躁日日躁欧美| 四虎影院一区二区三区| 国产黄色片免费| 久久蜜桃资源一区二区老牛| 久久精品久久久久久| 国产草草浮力影院| 日韩护士脚交太爽了| 婷婷中文字幕综合| 国产美女视频免费| 黄色在线播放| 丁香六月综合激情| 国产噜噜噜噜久久久久久久久| 99热精品免费| 精品日韩毛片| 精品国产91亚洲一区二区三区婷婷| caoporn超碰97| 激情av在线播放| 国产精品久久看| 久久婷婷国产综合尤物精品| 国产高清在线免费| 蜜臀av一区二区在线观看| 97香蕉久久超级碰碰高清版| 欧美视频www| 日韩.com| 国产一区二区三区在线| 最新在线黄色网址| 丁香综合av| 欧美一区二区三区免费观看视频| 别急慢慢来1978如如2| 精品捆绑调教一区二区三区| 亚洲精品第一国产综合野| 亚洲看片网站| 国产精品二线| 国产视频一区二区在线观看| 精品国产乱码久久久久久丨区2区| 99草在线视频| 精品一区二区免费| 国产精品一区二区三区在线播放| 蜜臀精品一区二区三区| 亚洲永久在线| 97超级碰碰碰久久久| 久久av高潮av无码av喷吹| 亚洲九九视频| 久久五月情影视| 国产精品久久久免费看| 成人同人动漫免费观看| 国产一区二区三区丝袜| 精品人妻中文无码av在线| 国产成人手机高清在线观看网站| 日韩av综合中文字幕| 亚洲色偷偷色噜噜狠狠99网| 一级毛片精品毛片| 日韩欧美黄色影院| 激情av中文字幕| 成人直播在线观看| 精品久久久久久久久久久久久久久久久 | 91av久久久| 麻豆成人久久精品二区三区红| 国产精品福利片| 五月婷婷激情五月| 蜜桃在线一区二区三区| 国产日韩专区在线| 99精品视频在线播放免费| 国产一区二区精品久久91| 97免费资源站| 欧美一区二区黄片| 2021国产精品久久精品| 日韩免费av一区二区三区| 亚洲xxxxxx| 亚洲精品免费电影| 国产美女主播在线| 欧美xxx网站| 欧美日韩五月天| 色婷婷综合在线观看| 51vv免费精品视频一区二区| 亚洲第一中文字幕| 国产一区二区三区四区在线| 亚洲啊v在线观看| 欧美国产激情18| 久久不卡免费视频| 另类中文字幕网| 国产精品xxx在线观看www| 亚洲日本在线播放| 中文字幕一区二区三区在线观看 | 99三级在线| 五月婷婷六月丁香| 国产精品三级视频| 欧美一级片免费播放| av有声小说一区二区三区| 777精品伊人久久久久大香线蕉| 香蕉在线观看视频| 亚洲最大在线| 九九热在线精品视频| 依依成人综合网| 精品一区二区免费看| 国产在线精品二区| 麻豆传媒在线完整视频| 偷拍一区二区三区| 91网址在线观看精品| 妖精视频一区二区三区| 欧美人成在线视频| 波多野结衣电车痴汉| 粉嫩嫩av羞羞动漫久久久 | 国产精品欧美风情| 国产成人手机在线| 中文字幕制服丝袜一区二区三区 | 日韩影院在线观看| 成人在线观看av| 成人高清免费在线播放| 亚洲3atv精品一区二区三区| 四季av一区二区三区| 日韩大胆成人| 欧美韩国理论所午夜片917电影| 中文字幕免费视频观看| 国产91丝袜在线观看| 一本久道久久综合| 日韩电影免费观看高清完整版| 欧美一区二区啪啪| 岛国片在线免费观看| 久久国产成人| 国产欧美日韩综合精品二区| 黄色av免费在线| 欧美亚日韩国产aⅴ精品中极品| 欧美夫妇交换xxx| 欧美精品18| 亚洲在线观看视频| 韩国中文字幕在线| 欧美精品自拍偷拍| 夜夜春很很躁夜夜躁| 亚洲欧美日韩国产一区二区| 国产伦理久久久| 中文字幕中文字幕在线中高清免费版| 91福利在线免费观看| 激情综合丁香五月| 在线精品一区| 成人黄色在线免费观看| 菠萝菠萝蜜在线视频免费观看| 欧美性欧美巨大黑白大战| 玖玖爱在线观看| 久久先锋影音| 日韩av一区二区三区美女毛片| 亚洲电影观看| 亚洲欧美另类人妖| 久久久久女人精品毛片九一| 久久这里只有精品首页| 成人免费在线小视频| 红杏视频成人| 国自产精品手机在线观看视频| www.爱爱.com| 亚洲成av人影院在线观看网| 好男人香蕉影院| 美女网站久久| 色一情一区二区三区四区| 国产精品无码久久久久| y97精品国产97久久久久久| 国产日韩一级片| 亚洲一线二线三线视频| 漂亮人妻被黑人久久精品| 夜久久久久久| 日本精品一区二区三区不卡无字幕| 成人免费av电影| 日韩在线观看高清| 亚洲国产精品久久久久爰性色| 亚洲成人久久影院| 97人妻精品一区二区三区免| 日韩在线观看一区二区| 一本二本三本亚洲码| 国产精品毛片视频| 国产成人精品久久二区二区91| 高清av电影在线观看| 555www色欧美视频| 国产精选第一页| 久久精品日产第一区二区三区高清版 | av软件在线观看| 亚洲成人在线视频播放| 蜜臀99久久精品久久久久小说| 国产精品久久久久久一区二区三区 | 大又大又粗又硬又爽少妇毛片| 日日夜夜精品视频免费| 国产欧美日韩伦理| 影音成人av| 久久国产精品影片| 亚洲欧美另类综合| 午夜久久久久久久久久一区二区| av女人的天堂| 国内精品在线播放| 成人免费性视频| 九九亚洲视频| 99久久久精品免费观看国产 | 日韩福利视频导航| 欧美在线免费视屏| 久久人人爽人人人人片| 亚洲高清电影| 久久99精品久久久久久青青日本| 精精国产xxxx视频在线播放| 在线中文字幕日韩| 97免费观看视频| 亚洲福利视频导航| 亚洲最大成人网站| 激情久久久久久久久久久久久久久久| 日本不卡一区| 欧美激情三级| 日韩av不卡在线| 免费不卡视频| 亚洲国产精品va在线观看黑人| 中文字幕一区二区人妻视频| 日本一区二区成人在线| 色悠悠在线视频| 久久青草久久| 日本人妻伦在线中文字幕| 精品国产一区一区二区三亚瑟| 成人性生交大片免费看小说| 松下纱荣子在线观看| 欧美成人高清视频| 欧美一区二区少妇| 精品粉嫩aⅴ一区二区三区四区| 香蕉影院在线观看| 一区二区三区在线视频观看58| 少妇精品一区二区三区| 久草这里只有精品视频| 黄色高清无遮挡| 影音先锋中文字幕一区二区| 日韩三级电影| 米奇精品关键词| 91麻豆精品秘密入口| 亚洲第一会所001| 91精品国产网站| 宅男在线观看免费高清网站| 色狠狠av一区二区三区香蕉蜜桃| 天天舔天天干天天操| 欧美一级日韩免费不卡| 中文字幕乱码人妻二区三区| 在线免费一区三区| 青青草成人av| 午夜精品久久久久久久久久| 日本高清一二三区| 亚洲特级片在线| 在线观看天堂av| 国产女同性恋一区二区| 给我免费观看片在线电影的| www.av亚洲| 国产sm在线观看| 国产一区二三区好的| 久久久久久久久久一区二区| 日本午夜精品一区二区三区电影| 日本a视频在线观看| 日韩亚洲国产欧美| 我的公把我弄高潮了视频| 欧美日韩一区自拍| 黄色一级大片免费| 亚洲第一偷拍| 糖心vlog在线免费观看| 中文视频一区| 免费人成自慰网站| 国精品一区二区| 国产乱子伦精品无码专区| 欧美精品99| 黄网站欧美内射| 国产精品久久久免费| 午夜精品久久久久久久无码| 久久久久久网| 91淫黄看大片| 久久99精品国产.久久久久久 | 国产精品四虎| 亚洲电影第1页| 欧美日韩国产中文字幕在线| 亚洲精品网址在线观看| 免费在线稳定资源站| 最近2019免费中文字幕视频三| 69久久精品| 久久精品中文字幕电影| 菠萝菠萝蜜在线视频免费观看| 午夜精品久久久久久久99热浪潮| 99thz桃花论族在线播放| 91精品国产91久久久久久| 亚洲校园激情春色| 国产精品偷伦免费视频观看的| 粉嫩一区二区三区在线观看| 国产欧美一区二区三区另类精品| 日韩成人一级| 中文字幕精品一区日韩| 欧美精品麻豆| 亚洲中文字幕无码中文字| 日韩不卡一区二区| 涩视频在线观看| 久久久久久日产精品| 午夜爽爽爽男女免费观看| 欧美日韩中文字幕在线| 美女黄页在线观看| 日韩精品一区二区三区三区免费 | 天天色影综合网| 亚洲一二三四区| 人人妻人人爽人人澡人人精品| 欧美成人一级视频| 国产区在线视频| 欧美日本国产在线| aa视频在线观看| 92国产精品久久久久首页| 美女av一区| 一区一区视频| 日日嗨av一区二区三区四区| 日本在线视频播放| 国产午夜精品理论片a级大结局| 免费精品在线视频| 大伊人狠狠躁夜夜躁av一区| 一道本无吗一区| 日韩精品黄色网| 久久av色综合| 国产欧美婷婷中文| 爽爽窝窝午夜精品一区二区| 国产免费内射又粗又爽密桃视频| 久久狠狠一本精品综合网| 国产人妻精品久久久久野外| 不卡av电影在线播放| 青青草手机在线视频| 狠狠做深爱婷婷久久综合一区| 国产模特av私拍大尺度| 色婷婷综合久久久久中文字幕1| www.51av欧美视频| 92看片淫黄大片看国产片| 一区二区三区四区日韩| 亚洲欧美国产日韩综合| 97久久久精品综合88久久| 国产性生活大片| 91高清视频免费看| 午夜小视频在线播放| 欧美激情一级精品国产| 疯狂欧洲av久久成人av电影| 日韩三级在线播放| 久久久久国产精品一区三寸| 国产人妻人伦精品1国产丝袜| 一区二区三区四区在线| 亚洲视频一区二区三区四区| 一色桃子一区二区| 成人av三级| 久久精品美女| 欧美一级专区| 国产精品无码永久免费不卡| 亚洲午夜久久久久久久久电影院| 成人av手机在线| 不卡中文字幕av| 香蕉久久一区| 丰满女人性猛交| 国产一区二区三区高清播放| 情侣偷拍对白清晰饥渴难耐| 欧美美女视频在线观看| 2019中文字幕在线视频| 国产精品国产三级国产aⅴ9色| 国语一区二区三区| 一二三四视频社区在线| 99久久99久久免费精品蜜臀| 久久午夜鲁丝片午夜精品| 日韩一区二区影院| 国产黄色在线免费观看| 成人性生交大片免费观看嘿嘿视频| 欧美成人午夜| 国产sm在线观看| 亚洲国产精品久久人人爱蜜臀| 无码国产精品一区二区色情男同| 5278欧美一区二区三区| 日韩欧美国产大片| 92看片淫黄大片一级| 国产欧美日韩综合精品一区二区| 中文字幕av第一页| 麻豆一区二区在线观看| 亚洲网址在线观看| 99视频在线免费播放| 国产欧美精品一区二区三区四区| 最近中文字幕在线观看| 久久精彩免费视频| 看全色黄大色大片免费久久久| 毛片在线视频播放| 欧美激情一区三区| 波多野结衣黄色网址| 日韩一中文字幕| 国产电影一区二区| 人人妻人人添人人爽欧美一区| 久久久久久久久久看片| 在线播放国产一区| 欧美精品videossex性护士| 亚州国产精品| 99色精品视频| 亚洲欧美视频在线观看视频| 亚洲乱色熟女一区二区三区| 97婷婷大伊香蕉精品视频| 99精品电影| 亚洲国产综合视频| 日韩一区二区三区精品视频| 欧美大胆a人体大胆做受|