精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kubebuilder 進階之源碼分析

開發 前端
kubebuilder 幫我們做了很多事情,讓我們的開發基本上只需要關注一個 Reconcile 函數就可以了,但是從另外一個方面來講,kubebuilder 目前對我們來說它還是一個黑盒,會產生很多的疑問.

 [[399769]]

在前面的文章當中我們已經完整的完成了一個 Operator 的開發,涉及到了 CURD、預刪除、Status、Event、OwnerReference、WebHook,也算是將一個 Operator 開發中會涉及到的點大部分都了解了一下。kubebuilder 幫我們做了很多事情,讓我們的開發基本上只需要關注一個 Reconcile 函數就可以了,但是從另外一個方面來講,kubebuilder 目前對我們來說它還是一個黑盒,會產生很多的疑問:

  • Reconcile 方法是怎么被觸發的?
  • 怎么識別到不同的資源?
  • 整體是如何進行工作的?
  • ……

架構

我們先來看一下來自官方文檔的這個架構圖[1]

arch

  • Process 進程通過 main.go啟動,一般來說一個 Controller 只有一個進程,如果做了高可用的話,會有多個
  • Manager 每個進程會有一個 Manager,這是核心組件,主要負責
    • metrics 的暴露
    • webhook 證書
    • 初始化共享的 cache
    • 初始化共享的 clients 用于和 APIServer 進行通信
    • 所有的 Controller 的運行
  • Client 一般來說,我們 創建、更新、刪除某個資源的時候會直接調用 Client 和 APIServer 進行通信
  • Cache 負責同步 Controller 關心的資源,其核心是 GVK -> Informer 的映射,一般我們的 Get 和 List 操作都會從 Cache 中獲取數據
  • Controller 控制器的業務邏輯所在的地方,一個 Manager 可能會有多個 Controller,我們一般只需要實現 Reconcile 方法就行。圖上的 Predicate 是事件過濾器,我們可以在 Controller 中過濾掉我們不關心的事件信息
  • WebHook 就是我們準入控制實現的地方了,主要是有兩類接口,一個是 MutatingAdmissionWebhook 需要實現 Defaulter 接口,一個是 ValidatingAdmissionWebhook 需要實現 Validator 接口

源碼分析

了解了基本的架構之后,我們就從入口 main.go 開始,看一看 kubebuilder 究竟在后面偷偷的做了哪些事情吧。

main.go

  1. // 省略了參數綁定和 error check 的代碼 
  2. func main() { 
  3.     var metricsAddr string 
  4.     var enableLeaderElection bool 
  5.     var probeAddr string 
  6.  
  7.     ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) 
  8.  
  9.     mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ 
  10.         Scheme:                 scheme, 
  11.         MetricsBindAddress:     metricsAddr, 
  12.         Port:                   9443, 
  13.         HealthProbeBindAddress: probeAddr, 
  14.         LeaderElection:         enableLeaderElection, 
  15.         LeaderElectionID:       "97acaccf.lailin.xyz"
  16.         // CertDir:                "config/cert/", // 手動指定證書位置用于測試 
  17.     }) 
  18.      
  19.  
  20.     (&controllers.NodePoolReconciler{ 
  21.         Client:   mgr.GetClient(), 
  22.         Log:      ctrl.Log.WithName("controllers").WithName("NodePool"), 
  23.         Scheme:   mgr.GetScheme(), 
  24.         Recorder: mgr.GetEventRecorderFor("NodePool"), 
  25.     }).SetupWithManager(mgr) 
  26.  
  27.     (&nodesv1.NodePool{}).SetupWebhookWithManager(mgr) 
  28.    
  29.     //+kubebuilder:scaffold:builder 
  30.  
  31.     mgr.AddHealthzCheck("healthz", healthz.Ping) 
  32.     mgr.AddReadyzCheck("readyz", healthz.Ping) 
  33.  
  34.     setupLog.Info("starting manager"
  35.     mgr.Start(ctrl.SetupSignalHandler()) 

可以看到 main.go 主要是做了一些啟動的工作包括:

  • 創建一個 Manager
  • 使用剛剛創建的 Manager 創建了一個 Controller
  • 啟動 WebHook
  • 添加健康檢查
  • 啟動 Manager

下面我們就順著 main 函數里面的邏輯一步步的往下看看

NewManger

  1. // New returns a new Manager for creating Controllers. 
  2. func New(config *rest.Config, options Options) (Manager, error) { 
  3.     // 省略配置初始化相關代碼 
  4.  
  5.     // 創建 cache 
  6.     cache, err := options.NewCache(config,  
  7.                                  cache.Options{ 
  8.                                    Scheme: options.Scheme, // main 中傳入的 scheme 
  9.                                    Mapper: mapper,         // k8s api 和 go type 的轉換器 
  10.                                    Resync: options.SyncPeriod, // 默認 10 小時,一般不要改 
  11.                                    Namespace: options.Namespace, // 需要監聽的 namespace 
  12.                                  }) 
  13.  
  14.   // 創建和 APIServer 交互的 client,讀寫分離 
  15.     clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper} 
  16.     apiReader, err := client.New(config, clientOptions) 
  17.  
  18.  
  19.     writeObj, err := options.ClientBuilder. 
  20.         WithUncached(options.ClientDisableCacheFor...). 
  21.         Build(cache, config, clientOptions) 
  22.  
  23.     if options.DryRunClient { 
  24.         writeObj = client.NewDryRunClient(writeObj) 
  25.     } 
  26.  
  27.     // 創建事件記錄器 
  28.     recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) 
  29.  
  30.     // 需要需要高可用的話,創建選舉相關的配置 
  31.     leaderConfig := config 
  32.     if options.LeaderElectionConfig != nil { 
  33.         leaderConfig = options.LeaderElectionConfig 
  34.     } 
  35.     resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ 
  36.         LeaderElection:             options.LeaderElection, 
  37.         LeaderElectionResourceLock: options.LeaderElectionResourceLock, 
  38.         LeaderElectionID:           options.LeaderElectionID, 
  39.         LeaderElectionNamespace:    options.LeaderElectionNamespace, 
  40.     }) 
  41.  
  42.     // 創建 metric 和 健康檢查的接口 
  43.     metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) 
  44.  
  45.     // By default we have no extra endpoints to expose on metrics http server. 
  46.     metricsExtraHandlers := make(map[string]http.Handler) 
  47.  
  48.     // Create health probes listener. This will throw an error if the bind 
  49.     // address is invalid or already in use. 
  50.     healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) 
  51.     if err != nil { 
  52.         return nil, err 
  53.     } 
  54.  
  55.   // 最后將這些配置放到 manager 中 
  56.     return &controllerManager{ 
  57.         config:                  config, 
  58.         scheme:                  options.Scheme, 
  59.         cache:                   cache, 
  60.         fieldIndexes:            cache, 
  61.         client:                  writeObj, 
  62.         apiReader:               apiReader, 
  63.         recorderProvider:        recorderProvider, 
  64.         resourceLock:            resourceLock, 
  65.         mapper:                  mapper, 
  66.         metricsListener:         metricsListener, 
  67.         metricsExtraHandlers:    metricsExtraHandlers, 
  68.         logger:                  options.Logger, 
  69.         elected:                 make(chan struct{}), 
  70.         port:                    options.Port, 
  71.         host:                    options.Host, 
  72.         certDir:                 options.CertDir, 
  73.         leaseDuration:           *options.LeaseDuration, 
  74.         renewDeadline:           *options.RenewDeadline, 
  75.         retryPeriod:             *options.RetryPeriod, 
  76.         healthProbeListener:     healthProbeListener, 
  77.         readinessEndpointName:   options.ReadinessEndpointName, 
  78.         livenessEndpointName:    options.LivenessEndpointName, 
  79.         gracefulShutdownTimeout: *options.GracefulShutdownTimeout, 
  80.         internalProceduresStop:  make(chan struct{}), 
  81.     }, nil 

創建 Cache

  1. func New(config *rest.Config, opts Options) (Cache, error) { 
  2.     opts, err := defaultOpts(config, opts) 
  3.     if err != nil { 
  4.         return nil, err 
  5.     } 
  6.     im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) 
  7.     return &informerCache{InformersMap: im}, nil 

這里主要是調用 NewInformersMap方法創建 Informer 的映射

  1. func NewInformersMap(config *rest.Config, 
  2.     scheme *runtime.Scheme, 
  3.     mapper meta.RESTMapper, 
  4.     resync time.Duration, 
  5.     namespace string) *InformersMap { 
  6.  
  7.     return &InformersMap{ 
  8.         structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace), 
  9.         unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), 
  10.         metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace), 
  11.  
  12.         Scheme: scheme, 
  13.     } 

NewInformersMap會去分別創建,結構化、非結構化以及 metadata 的 InformerMap 而這些方法最后都會去調用 newSpecificInformersMap方法,區別就是不同的方法傳入的 createListWatcherFunc 參數不同

  1. func newSpecificInformersMap(config *rest.Config, 
  2.     scheme *runtime.Scheme, 
  3.     mapper meta.RESTMapper, 
  4.     resync time.Duration, 
  5.     namespace string, 
  6.     createListWatcher createListWatcherFunc) *specificInformersMap { 
  7.     ip := &specificInformersMap{ 
  8.         config:            config, 
  9.         Scheme:            scheme, 
  10.         mapper:            mapper, 
  11.         informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry), 
  12.         codecs:            serializer.NewCodecFactory(scheme), 
  13.         paramCodec:        runtime.NewParameterCodec(scheme), 
  14.         resync:            resync, 
  15.         startWait:         make(chan struct{}), 
  16.         createListWatcher: createListWatcher, 
  17.         namespace:         namespace, 
  18.     } 
  19.     return ip 

newSpecificInformersMap 和常規的 InformersMap 類似,區別是沒實現 WaitForCacheSync方法

以結構化的傳入的 createStructuredListWatch 為例,主要是返回一個用于創建 SharedIndexInformer 的 ListWatch 對象

  1. func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { 
  2.  // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the 
  3.  // groupVersionKind to the Resource API we will use. 
  4.  mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) 
  5.  if err != nil { 
  6.   return nil, err 
  7.  } 
  8.  
  9.  client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) 
  10.  if err != nil { 
  11.   return nil, err 
  12.  } 
  13.  listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List"
  14.  listObj, err := ip.Scheme.New(listGVK) 
  15.  if err != nil { 
  16.   return nil, err 
  17.  } 
  18.  
  19.  // TODO: the functions that make use of this ListWatch should be adapted to 
  20.  //  pass in their own contexts instead of relying on this fixed one here. 
  21.  ctx := context.TODO() 
  22.  // Create a new ListWatch for the obj 
  23.  return &cache.ListWatch{ 
  24.   ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { 
  25.    res := listObj.DeepCopyObject() 
  26.    isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot 
  27.    err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) 
  28.    return res, err 
  29.   }, 
  30.   // Setup the watch function 
  31.   WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { 
  32.    // Watch needs to be set to true separately 
  33.    opts.Watch = true 
  34.    isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot 
  35.    return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) 
  36.   }, 
  37.  }, nil 

小結: cache 主要是創建了一些 InformerMap,完成了 GVK 到 Informer 的映射,每個 Informer 會根據 ListWatch 函數對對應的 GVK 進行 List 和 Watch。

創建 Client

  1. func New(config *rest.Config, options Options) (Client, error) { 
  2.  if config == nil { 
  3.   return nil, fmt.Errorf("must provide non-nil rest.Config to client.New"
  4.  } 
  5.  
  6.  // Init a scheme if none provided 
  7.  if options.Scheme == nil { 
  8.   options.Scheme = scheme.Scheme 
  9.  } 
  10.  
  11.  // Init a Mapper if none provided 
  12.  if options.Mapper == nil { 
  13.   var err error 
  14.   options.Mapper, err = apiutil.NewDynamicRESTMapper(config) 
  15.   if err != nil { 
  16.    return nil, err 
  17.   } 
  18.  } 
  19.  
  20.  clientcache := &clientCache{ 
  21.   config: config, 
  22.   scheme: options.Scheme, 
  23.   mapper: options.Mapper, 
  24.   codecs: serializer.NewCodecFactory(options.Scheme), 
  25.  
  26.   structuredResourceByType:   make(map[schema.GroupVersionKind]*resourceMeta), 
  27.   unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), 
  28.  } 
  29.  
  30.  rawMetaClient, err := metadata.NewForConfig(config) 
  31.  if err != nil { 
  32.   return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err) 
  33.  } 
  34.  
  35.  c := &client{ 
  36.   typedClient: typedClient{ 
  37.    cache:      clientcache, 
  38.    paramCodec: runtime.NewParameterCodec(options.Scheme), 
  39.   }, 
  40.   unstructuredClient: unstructuredClient{ 
  41.    cache:      clientcache, 
  42.    paramCodec: noConversionParamCodec{}, 
  43.   }, 
  44.   metadataClient: metadataClient{ 
  45.    client:     rawMetaClient, 
  46.    restMapper: options.Mapper, 
  47.   }, 
  48.   scheme: options.Scheme, 
  49.   mapper: options.Mapper, 
  50.  } 
  51.  
  52.  return c, nil 

client 創建了兩個一個用于讀,一個用于寫,用于讀的會直接使用上面的 cache,用于寫的才會直接和 APIServer 進行交互

Controller

下面我們看一下核心的 Controller 是怎么初始化和工作的

  1. if err = (&controllers.NodePoolReconciler{ 
  2.   Client:   mgr.GetClient(), 
  3.   Log:      ctrl.Log.WithName("controllers").WithName("NodePool"), 
  4.   Scheme:   mgr.GetScheme(), 
  5.   Recorder: mgr.GetEventRecorderFor("NodePool"), 
  6. }).SetupWithManager(mgr); err != nil { 
  7.   setupLog.Error(err, "unable to create controller""controller""NodePool"
  8.   os.Exit(1) 

main.go 的方法里面主要是初始化了 Controller 的結構體,然后調用了 SetupWithManager方法

  1. // SetupWithManager sets up the controller with the Manager. 
  2. func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { 
  3.     return ctrl.NewControllerManagedBy(mgr). 
  4.         For(&nodesv1.NodePool{}). 
  5.         Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}). 
  6.         Complete(r) 

SetupWithManager之前有講到過,主要是使用了建造者模式,去構建了我們需要監聽的對象,只有這些對象的相關事件才會觸發我們的 Reconcile 邏輯。這里面的 Complete 最后其實是調用了 Build 方法

  1. func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { 
  2.     // 省略參數校驗 
  3.  
  4.     // Set the Config 
  5.     blder.loadRestConfig() 
  6.  
  7.     // Set the ControllerManagedBy 
  8.     if err := blder.doController(r); err != nil { 
  9.         return nil, err 
  10.     } 
  11.  
  12.     // Set the Watch 
  13.     if err := blder.doWatch(); err != nil { 
  14.         return nil, err 
  15.     } 
  16.  
  17.     return blder.ctrl, nil 

Build主要調用 doController 、doWatch兩個方法

  1. func (blder *Builder) doController(r reconcile.Reconciler) error { 
  2.     ctrlOptions := blder.ctrlOptions 
  3.     if ctrlOptions.Reconciler == nil { 
  4.         ctrlOptions.Reconciler = r 
  5.     } 
  6.  
  7.     // Retrieve the GVK from the object we're reconciling 
  8.     // to prepopulate logger information, and to optionally generate a default name
  9.     gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) 
  10.     if err != nil { 
  11.         return err 
  12.     } 
  13.  
  14.     // Setup the logger. 
  15.     if ctrlOptions.Log == nil { 
  16.         ctrlOptions.Log = blder.mgr.GetLogger() 
  17.     } 
  18.     ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group"reconciler kind", gvk.Kind) 
  19.  
  20.     // Build the controller and return
  21.     blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions) 
  22.     return err 

doController主要是初始化了一個 Controller,這里面傳入了我們實現 的Reconciler以及獲取到我們的 GVK 的名稱

  1. func (blder *Builder) doWatch() error { 
  2.     // Reconcile type 
  3.     typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) 
  4.     if err != nil { 
  5.         return err 
  6.     } 
  7.     src := &source.Kind{Type: typeForSrc} 
  8.     hdler := &handler.EnqueueRequestForObject{} 
  9.     allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) 
  10.     if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { 
  11.         return err 
  12.     } 
  13.  
  14.     // Watches the managed types 
  15.     for _, own := range blder.ownsInput { 
  16.         typeForSrc, err := blder.project(own.object, own.objectProjection) 
  17.         if err != nil { 
  18.             return err 
  19.         } 
  20.         src := &source.Kind{Type: typeForSrc} 
  21.         hdler := &handler.EnqueueRequestForOwner{ 
  22.             OwnerType:    blder.forInput.object, 
  23.             IsController: true
  24.         } 
  25.         allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) 
  26.         allPredicates = append(allPredicates, own.predicates...) 
  27.         if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { 
  28.             return err 
  29.         } 
  30.     } 
  31.  
  32.     // Do the watch requests 
  33.     for _, w := range blder.watchesInput { 
  34.         allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) 
  35.         allPredicates = append(allPredicates, w.predicates...) 
  36.  
  37.         // If the source of this watch is of type *source.Kind, project it. 
  38.         if srckind, ok := w.src.(*source.Kind); ok { 
  39.             typeForSrc, err := blder.project(srckind.Type, w.objectProjection) 
  40.             if err != nil { 
  41.                 return err 
  42.             } 
  43.             srckind.Type = typeForSrc 
  44.         } 
  45.  
  46.         if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { 
  47.             return err 
  48.         } 
  49.     } 
  50.     return nil 

Watch 主要是監聽我們想要的資源變化,blder.ctrl.Watch(src, hdler, allPredicates...)通過過濾源事件的變化,allPredicates是過濾器,只有所有的過濾器都返回 true 時,才會將事件傳遞給 EventHandler hdler,這里會將 Handler 注冊到 Informer 上

啟動

  1. func (cm *controllerManager) Start(ctx context.Context) (err error) { 
  2.     cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) 
  3.  
  4.     // 這個用來表示所有的協程都已經退出了, 
  5.     stopComplete := make(chan struct{}) 
  6.     defer close(stopComplete) 
  7.      
  8.   // ...... 
  9.  
  10.     // 用于保存錯誤 
  11.     cm.errChan = make(chan error) 
  12.  
  13.     // 如果需要 metric 就啟動 metric 服務 
  14.     if cm.metricsListener != nil { 
  15.         go cm.serveMetrics() 
  16.     } 
  17.  
  18.     // 啟動健康檢查服務 
  19.     if cm.healthProbeListener != nil { 
  20.         go cm.serveHealthProbes() 
  21.     } 
  22.  
  23.    
  24.     go cm.startNonLeaderElectionRunnables() 
  25.  
  26.     go func() { 
  27.         if cm.resourceLock != nil { 
  28.             err := cm.startLeaderElection() 
  29.             if err != nil { 
  30.                 cm.errChan <- err 
  31.             } 
  32.         } else { 
  33.             // Treat not having leader election enabled the same as being elected. 
  34.             close(cm.elected) 
  35.             go cm.startLeaderElectionRunnables() 
  36.         } 
  37.     }() 
  38.  
  39.   // 判斷是否需要退出 
  40.     select { 
  41.     case <-ctx.Done(): 
  42.         // We are done 
  43.         return nil 
  44.     case err := <-cm.errChan: 
  45.         // Error starting or running a runnable 
  46.         return err 
  47.     } 

無論是不是 leader 最后都會使用 startRunnable 啟動 Controller

  1. func (cm *controllerManager) startNonLeaderElectionRunnables() { 
  2.     cm.mu.Lock() 
  3.     defer cm.mu.Unlock() 
  4.  
  5.     cm.waitForCache(cm.internalCtx) 
  6.  
  7.     // Start the non-leaderelection Runnables after the cache has synced 
  8.     for _, c := range cm.nonLeaderElectionRunnables { 
  9.         // Controllers block, but we want to return an error if any have an error starting. 
  10.         // Write any Start errors to a channel so we can return them 
  11.         cm.startRunnable(c) 
  12.     } 

實際上是調用了 Controller 的 Start方法

  1. // Start implements controller.Controller 
  2. func (c *Controller) Start(ctx context.Context) error { 
  3.  
  4.   // Controller 只能被執行一次 
  5.     c.mu.Lock() 
  6.     if c.Started { 
  7.         return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times"
  8.     } 
  9.  
  10.     // Set the internal context. 
  11.     c.ctx = ctx 
  12.  
  13.   // 獲取隊列 
  14.     c.Queue = c.MakeQueue() 
  15.     defer c.Queue.ShutDown() 
  16.  
  17.     err := func() error { 
  18.         defer c.mu.Unlock() 
  19.  
  20.         defer utilruntime.HandleCrash() 
  21.  
  22.         // 嘗試等待緩存 
  23.         for _, watch := range c.startWatches { 
  24.             c.Log.Info("Starting EventSource""source", watch.src) 
  25.             if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { 
  26.                 return err 
  27.             } 
  28.         } 
  29.  
  30.         // 啟動 Controller 
  31.         c.Log.Info("Starting Controller"
  32.  
  33.      
  34.         for _, watch := range c.startWatches { 
  35.             syncingSource, ok := watch.src.(source.SyncingSource) 
  36.             if !ok { 
  37.                 continue 
  38.             } 
  39.             if err := syncingSource.WaitForSync(ctx); err != nil { 
  40.                 // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error 
  41.                 // Leaving it here because that could happen in the future 
  42.                 err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) 
  43.                 c.Log.Error(err, "Could not wait for Cache to sync"
  44.                 return err 
  45.             } 
  46.         } 
  47.  
  48.         // All the watches have been started, we can reset the local slice. 
  49.         // 
  50.         // We should never hold watches more than necessary, each watch source can hold a backing cache, 
  51.         // which won't be garbage collected if we hold a reference to it. 
  52.         c.startWatches = nil 
  53.  
  54.         if c.JitterPeriod == 0 { 
  55.             c.JitterPeriod = 1 * time.Second 
  56.         } 
  57.  
  58.         // Launch workers to process resources 
  59.         c.Log.Info("Starting workers""worker count", c.MaxConcurrentReconciles) 
  60.         ctrlmetrics.WorkerCount.WithLabelValues(c.Name). 
  61.                     Set(float64(c.MaxConcurrentReconciles)) 
  62.         for i := 0; i < c.MaxConcurrentReconciles; i++ { 
  63.             go wait.UntilWithContext(ctx, func(ctx context.Context) { 
  64.                 // 查詢隊列中有沒有關注的事件,有的話就觸發我們的 reconcile 邏輯 
  65.                 for c.processNextWorkItem(ctx) { 
  66.                 } 
  67.             }, c.JitterPeriod) 
  68.         } 
  69.  
  70.         c.Started = true 
  71.         return nil 
  72.     }() 
  73.     if err != nil { 
  74.         return err 
  75.     } 
  76.  
  77.     <-ctx.Done() 
  78.     c.Log.Info("Stopping workers"
  79.     return nil 
  80.  
  81. // attempt to process it, by calling the reconcileHandler. 
  82. func (c *Controller) processNextWorkItem(ctx context.Context) bool { 
  83.     obj, shutdown := c.Queue.Get() 
  84.     if shutdown { 
  85.         // Stop working 
  86.         return false 
  87.     } 
  88.  
  89.     // We call Done here so the workqueue knows we have finished 
  90.     // processing this item. We also must remember to call Forget if we 
  91.     // do not want this work item being re-queued. For example, we do 
  92.     // not call Forget if a transient error occurs, instead the item is 
  93.     // put back on the workqueue and attempted again after a back-off 
  94.     // period. 
  95.     defer c.Queue.Done(obj) 
  96.  
  97.     ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) 
  98.     defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) 
  99.  
  100.     c.reconcileHandler(ctx, obj) 
  101.     return true 

總結

Reconcile 方法的觸發是通過 Cache 中的 Informer 獲取到資源的變更事件,然后再通過生產者消費者的模式觸發我們自己實現的 Reconcile 方法的。

Kubebuilder 是一個非常好用的 Operator 開發框架,不僅極大的簡化了 Operator 的開發過程,并且充分的利用了 go interface 的特性留下了足夠的擴展性,這個我們可以學習,如果我們的業務代碼開發框架能夠做到這個地步,我覺得也就不錯了

參考文獻

架構圖 https://master.book.kubebuilder.io/architecture.html?

本文轉載自微信公眾號「mohuishou」,可以通過以下二維碼關注。轉載本文請聯系mohuishou公眾號。

原文鏈接:https://lailin.xyz/post/operator-09-kubebuilder-code.html

 

責任編輯:武曉燕 來源: mohuishou博客
相關推薦

2016-09-22 15:50:38

JavascriptRedux源碼解析

2021-09-26 08:35:17

Android控件寬高

2021-05-18 05:40:27

kubebuilderwebhook進階

2021-05-17 05:51:31

KubeBuilderOperator測試

2019-12-23 09:13:11

Python數據語言

2021-09-05 07:35:58

lifecycleAndroid組件原理

2011-03-23 10:30:01

LAMPApache源碼

2022-04-06 14:55:45

Harmony同步機制鴻蒙

2021-05-12 06:18:19

KubeBuilderOperatork8s

2021-09-02 07:00:01

Glide流程Android

2021-10-20 07:18:50

開源輕量級緩存

2021-08-28 07:48:04

AndroidActivityRecActivitySta

2021-05-16 10:52:58

kubebuilderstatus event

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2016-12-09 19:21:14

2011-03-15 11:33:18

iptables

2021-08-12 16:28:10

AndroidHandleLooper

2021-09-05 17:22:08

Strview.js工具js

2011-05-26 10:05:48

MongoDB

2021-05-08 09:02:48

KubeBuilderOperatork8s
點贊
收藏

51CTO技術棧公眾號

黄频在线免费观看| 国产在视频线精品视频| 校园春色亚洲| 亚洲国产精品av| 91精品视频在线看| 国产欧美日韩另类| 日韩欧美自拍| 精品国产一区二区三区久久久蜜月 | 亚洲免费av电影| 久热在线视频观看| 鲁鲁在线中文| 亚洲色图制服诱惑| 欧美日韩精品久久久免费观看| 国产精品九九九九| 亚洲综合二区| 久久97精品久久久久久久不卡| 国产精品1000部啪视频| 精品国产亚洲一区二区三区在线| 疯狂欧美牲乱大交777| 自拍偷拍亚洲色图欧美| 久久精品国产亚洲a∨麻豆| 精品一区二区影视| 日本最新高清不卡中文字幕| 男人与禽猛交狂配| 加勒比久久综合| 亚洲精品国产欧美| 黑人无套内谢中国美女| 成人av集中营| 色8久久精品久久久久久蜜| 日本黄大片在线观看| 欧美日韩在线看片| 久久久久久久久久久久久夜| 国产欧美日韩一区| 99这里有精品视频| 精品影视av免费| 国产精品久久一区| 亚洲黄色免费观看| 国产精品久久久久毛片大屁完整版 | 国产草草浮力影院| 亚洲一区二区三区免费| 这里只有精品电影| 日本黄大片一区二区三区| 美女100%一区| 日韩欧美在线免费观看| 日本午夜激情视频| 92久久精品| 亚洲一区二区三区中文字幕在线| 在线观看一区二区三区三州| 色开心亚洲综合| 国产欧美精品在线观看| 奇米精品在线| 玖玖综合伊人| 久久久久国色av免费看影院| 欧美lavv| 成人精品一区二区三区校园激情| 久久欧美一区二区| 欧美日韩成人一区二区三区| 男女污视频在线观看| 久久亚洲影视婷婷| 欧美一区2区三区4区公司二百| 女人偷人在线视频| 99久久伊人精品| 麻豆亚洲一区| 成人p站proumb入口| 国产清纯白嫩初高生在线观看91| 日韩精品欧美一区二区三区| 国自产拍在线网站网址视频| 欧美激情一二三区| 一区二区三区不卡在线| 黄色成人在线观看| 一区二区三区在线影院| 给我免费播放片在线观看| 99在线视频影院| 色呦呦国产精品| 三上悠亚av一区二区三区| 色狠狠一区二区三区| 日韩一级欧美一级| 538国产视频| 精品美女视频| 欧美大尺度激情区在线播放| 精品在线视频免费观看| 男人的天堂成人在线| 国产精品人成电影在线观看| 99久久久无码国产精品免费| 波多野结衣中文字幕一区| 欧美视频小说| 中文字幕在线三区| 精品久久久精品| 在线观看免费视频高清游戏推荐| 亚洲日本视频在线| 国产丝袜高跟一区| 国产成人精品视频免费| 国产精品啊啊啊| 日本视频久久久| 99国产精品久久久久久久成人| 成人18精品视频| 亚洲免费不卡| 僵尸再翻生在线观看免费国语| 欧美综合天天夜夜久久| 日本成人在线免费| 国产探花在线精品一区二区| 欧美精品亚州精品| 中文字幕一区二区三区四区欧美| 国产一区二区三区精品欧美日韩一区二区三区 | 欧美国产日韩精品免费观看| 中文字幕人妻熟女人妻洋洋| 香蕉视频亚洲一级| 日韩午夜激情av| 欧美熟妇一区二区| 午夜日韩av| 国产精品视频免费在线观看| 天堂在线一二区| 亚洲精品日日夜夜| 中文字幕有码av| 日本中文字幕在线一区| 久热99视频在线观看| 日韩黄色片网站| av一区二区三区黑人| 欧美aaa在线观看| 亚洲电影有码| 亚洲精品日韩欧美| 国产一级二级三级视频| 九九**精品视频免费播放| 欧美日韩一区在线播放| 黄色的视频在线观看| 欧美日韩精品欧美日韩精品一| 800av在线播放| 国产精品v亚洲精品v日韩精品| 国产精品久久久久久久久久三级| 日韩一区二区三区在线观看视频| 日韩毛片精品高清免费| 别急慢慢来1978如如2| 精品自拍偷拍| 欧美精品video| 色成年激情久久综合| 黄色录像a级片| 激情文学一区| 97超级在线观看免费高清完整版电视剧| av大全在线免费看| 在线日韩av片| 成人在线观看免费高清| 久久国产精品亚洲77777| 精品免费一区二区三区蜜桃| 国产乱码在线| 欧美精品一区二区在线播放| 国产精品不卡av| 不卡一区在线观看| 美女扒开大腿让男人桶| 国内精品麻豆美女在线播放视频| 欧美国产亚洲精品久久久8v| 国内精品久久久久久久久久| 亚洲精品一二三区| 亚洲最大视频网| 91久久亚洲| 久久国产精品久久| 在线能看的av网址| 亚洲欧洲中文天堂| 中文字幕+乱码+中文乱码www | 日本一区二区三区免费观看| 成人免费短视频| 一本大道亚洲视频| 中文字幕在线网址| 亚洲欧洲av一区二区三区久久| www.污污视频| 国产精品vip| 久久精品第九区免费观看 | 色天天综合色天天久久| 丰满的亚洲女人毛茸茸| 激情六月婷婷久久| 99久久99久久精品| 香蕉视频一区| 国产精品人成电影| 在线观看中文字幕的网站| 精品国产免费人成在线观看| 台湾佬中文在线| 国产精品久久久久久久久晋中 | 亚洲第一福利社区| 国产精品成人一区| 国产传媒在线播放| 亚洲电影天堂av| 欧美超碰在线观看| 亚洲视频在线一区| 中文字幕乱码在线| 美国三级日本三级久久99 | 黄色国产在线播放| 国产精品18久久久久| 东京热加勒比无码少妇| 91精品国产91久久久久久密臀| 粉嫩av四季av绯色av第一区| 亚洲成人人体| 久久91精品国产91久久久| 日本在线丨区| 日韩一区二区在线播放| 免费的毛片视频| 亚洲另类在线视频| jizz日本免费| 国产麻豆精品视频| 黄在线观看网站| 欧美高清日韩| 四虎永久国产精品| 精品亚洲自拍| 亚洲自拍小视频免费观看| 9i看片成人免费高清| 九九精品在线播放| 电影av在线| 日韩电影在线观看永久视频免费网站| 中文字幕一区二区在线视频 | 最近2019中文免费高清视频观看www99 | 久久久久香蕉视频| 国产精品久久久久精k8| 三叶草欧洲码在线| 国产精品888| 在线免费观看视频黄| 亚洲国产二区| 亚洲五码在线观看视频| 成人一区二区| 欧美lavv| 中文字幕av一区二区三区人| 国产福利久久精品| 国产亚洲高清一区| 国产日韩精品入口| 3d性欧美动漫精品xxxx软件| 亚州成人av在线| 综合久久2o19| 久久资源免费视频| 天天综合视频在线观看| 亚洲天堂av综合网| 每日更新在线观看av| 亚洲精品国产精品自产a区红杏吧 亚洲精品国产精品乱码不99按摩 亚洲精品国产精品久久清纯直播 亚洲精品国产精品国自产在线 | 超碰97人人干| 99久久国产综合精品色伊| 国产麻豆剧传媒精品国产| 国内精品免费在线观看| www.久久av.com| 极品少妇一区二区三区精品视频| 91激情视频在线| 日韩av高清在线观看| 欧美国产日韩在线播放| 久久国产直播| 已婚少妇美妙人妻系列| 亚洲免费在线| 欧美黄网站在线观看| 久久成人一区| 凹凸日日摸日日碰夜夜爽1| 久久国产精品久久w女人spa| 四虎永久在线精品无码视频| 丝袜美腿亚洲综合| 中文字幕在线观看第三页| 日本视频中文字幕一区二区三区| 亚洲精品怡红院| 蜜臂av日日欢夜夜爽一区| 狠狠躁狠狠躁视频专区| 精品一区二区三区视频| 亚洲天堂一区二区在线观看| 国产福利一区在线观看| 最新日本中文字幕| 不卡电影一区二区三区| 亚洲午夜久久久久久久久红桃 | 久久成人免费电影| 手机在线免费毛片| 国产91精品精华液一区二区三区| 成人一区二区三区仙踪林| 99在线精品视频| 人人妻人人澡人人爽| 中文字幕一区二区5566日韩| 欧美另类videoxo高潮| 亚洲日本在线观看| 香蕉视频一区二区| 色拍拍在线精品视频8848| 亚洲永久精品一区| 欧美吞精做爰啪啪高潮| 国产精品视频在线观看免费| 欧美va亚洲va国产综合| 午夜一区在线观看| 在线成人一区二区| 在线中文字幕电影| 欧美一区在线直播| 国产第一精品| www 成人av com| 亚洲精品白浆高清| 综合一区中文字幕| 国产日韩欧美一区在线| 黑森林精品导航| 国产精品羞羞答答xxdd| 欧美大片免费播放器| 中文字幕欧美国产| 欧美日韩综合一区二区| 欧美午夜精品久久久久久浪潮| 一本久道久久综合无码中文| 亚洲第一页自拍| aaa在线观看| 欧美精品久久一区二区 | 欧美熟乱第一页| 亚洲第一页在线观看| 亚洲精品一区二区久| 毛片免费不卡| 日本最新高清不卡中文字幕| 久久伊人精品| 欧美一区二区三区四区在线观看地址| 亚洲人体av| 国产xxxxx视频| 成人不卡免费av| 午夜精品一区二区三级视频| 欧美视频在线免费| 精品人妻av一区二区三区| 亚洲人午夜精品| 男女在线观看视频| 国产欧美最新羞羞视频在线观看| 久久av国产紧身裤| 免费成人进口网站| 轻轻草成人在线| 亚洲第一成人网站| 亚洲www啪成人一区二区麻豆 | 亚洲精品理论电影| av片在线观看| 国产精品自产拍在线观看中文| 亚洲国产合集| 免费观看国产精品视频| 国产伦精品一区二区三区免费| 免费看的黄色网| 91久久精品一区二区| 熟妇人妻av无码一区二区三区| 欧美巨猛xxxx猛交黑人97人| av在线亚洲一区| 亚洲欧洲国产精品久久| 日韩成人一区二区三区在线观看| www.日本高清| 无吗不卡中文字幕| 好吊色在线观看| 九九精品在线播放| 亚洲免费一区三区| 超薄肉色丝袜足j调教99| 另类综合日韩欧美亚洲| 国产伦理片在线观看| 一本一本大道香蕉久在线精品| 日本韩国在线观看| 久久久久久久久综合| 亚洲国产高清在线观看| 欧美日韩一区二区三区电影| 久久av资源站| 91 在线视频| 欧美一区二区在线播放| www国产在线观看| 3d动漫啪啪精品一区二区免费| 五月精品视频| 国产精品熟女一区二区不卡| 亚洲精品自拍动漫在线| www国产在线| 久久人人爽人人| 日韩av资源网| 男人透女人免费视频| 国产婷婷色一区二区三区| 中文字幕理论片| 精品久久久av| 亚洲一区二区三区在线免费 | 免费在线视频一区| 国产不卡在线观看视频| 91精品国产高清一区二区三区蜜臀 | 精品中文字幕av| 日本一区免费视频| 国产一卡二卡三卡| 中文字幕亚洲字幕| 国产亚洲观看| 丰满的少妇愉情hd高清果冻传媒 | 日本欧美一区二区在线观看| 国产破处视频在线观看| 日韩亚洲欧美中文三级| 92久久精品| 日韩成人在线资源| 免费成人av在线| 中文字幕手机在线观看| 亚洲第一级黄色片| 成人在线爆射| 黄色网络在线观看| 99免费精品在线| 在线视频免费观看一区| 久久6免费高清热精品| 免费视频一区三区| 国产三级精品三级在线| 亚洲一二三四在线观看| 国产在线观看黄| 91手机在线视频| 视频一区二区国产| 动漫性做爰视频| 亚洲精品视频久久| 国产999精品在线观看| 怡红院av亚洲一区二区三区h| 国产欧美日产一区| 蜜臀久久久久久999| 国产精品男人的天堂| 黄色欧美日韩| 日韩欧美在线视频播放| 亚洲福利影片在线| 四虎地址8848精品| 噜噜噜久久亚洲精品国产品麻豆| 国产精品久久777777| 欧洲一级在线观看| 91免费观看| 久久9热精品视频| 无码人妻丰满熟妇区bbbbxxxx|