k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

虚幻大学 xuhss 234℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

client-go之Controller&Processor源码分析

1.controller与Processor概述

Controller

Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生。

Processor

Processor根据Controller的通知,即根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化。

先通过一张informer概要架构图看一下Controller&Processor所处位置与概要功能。

745b3ba3c77d4de82d9fc1c98cc4a9f1 - k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

2.Controller初始化与启动分析

2.1 Cotroller初始化-New

New用于初始化Controller,方法比较简单。

// staging/src/k8s.io/client-go/tools/cache/controller.go
func New(c *Config) Controller {
    ctlr := &controller{
        config: *c,
        clock:  &clock.RealClock{},
    }
    return ctlr
}

2.2 Controller启动-controller.Run

controller.Run为controller的启动方法,这里主要看到几个点:
(1)调用NewReflector,初始化Reflector;
(2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector(Reflector相关的分析前面的博客已经分析过了,这里不再重复);
(3)调用c.processLoop,开始controller的核心处理;

// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()

    wg.StartWithChannel(stopCh, r.Run)

    wait.Until(c.processLoop, time.Second, stopCh)
}

3.controller核心处理方法分析

controller.processLoop即为controller的核心处理方法。

controller.processLoop

controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

根据前面sharedIndexInformer的初始化与启动分析(sharedIndexInformer.Run)可以得知,c.config.Process即为s.HandleDeltas方法,所以接下来看到s.HandleDeltas方法的分析。

c.config.Process/s.HandleDeltas

根据前面分析知道HandleDeltas要处理的是Deltas,是Delta的切片类型。

再来看到HandleDeltas方法的主要逻辑:
(1)循环遍历Deltas,拿到单个Delta;
(2)判断Delta的类型;
(3)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并调用s.processor.distribute方法;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并调用s.processor.distribute方法;
(4)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并调用s.processor.distribute方法;

// staging/src/k8s.io/client-go/tools/cache/shared\_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

type updateNotification struct {
    oldObj interface{}
    newObj interface{}
}

type addNotification struct {
    newObj interface{}
}

type deleteNotification struct {
    oldObj interface{}
}

至此,Controller的分析就结束了,用一张图来回忆一下Controller的功能与架构。

4781699aa2de45fe2e79890533528900 - k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

4.processor核心处理方法分析

sharedIndexInformer.processor.distribute

接下来分析一下前面提到的s.processor.distribute方法。

可以看到distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中。

sync类型的对象写入到p.syncingListeners中,但informer中貌似没有启动p.syncingListeners或对p.syncingListeners做处理,所以sync类型的对象变化(也即list操作得到的对象所生成的对象变化)会被忽略?有待验证。

// staging/src/k8s.io/client-go/tools/cache/shared\_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}

sharedIndexInformer.processor.run

s.processor.run启动了processor,其中注意到listener.run与listener.pop两个核心方法。

这里可以看到processor的run方法中只启动了p.listeners,没有启动p.syncingListeners。

// staging/src/k8s.io/client-go/tools/cache/shared\_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

processorListener.pop

分析processorListener的pop方法可以得知,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中。那么谁来处理p.nextCh呢?继续往下看。

// staging/src/k8s.io/client-go/tools/cache/shared\_informer.go
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

processorListener.run

在processorListener的run方法中,将循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理。

// staging/src/k8s.io/client-go/tools/cache/shared\_informer.go
func (p *processorListener) run() {
    // this call blocks until the channel is closed. When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted. This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了。

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    onAdd,
    UpdateFunc: onUpdate,
    DeleteFunc: onDelete,
  })
// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
    if r.AddFunc != nil {
        r.AddFunc(obj)
    }
}

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    if r.UpdateFunc != nil {
        r.UpdateFunc(oldObj, newObj)
    }
}

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
    if r.DeleteFunc != nil {
        r.DeleteFunc(obj)
    }
}

至此,Processor的分析也结束了,用一张图来回忆一下Processor的功能与架构。

075636e3c009e8f67f97753d12963cf3 - k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

总结

Controller

Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生:
(1)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并通知Processor;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并通知Processor;
(2)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并通知Processor;

Processor

Processor根据Controller的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)来处理对象的变化。

informer架构中的Controller&Processor

2b89bebb65f55fe77cdb4312529ab5d3 - k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

在对informer中的Controller与Processor分析完之后,接下来将分析informer中的Indexer。

转载请注明:xuhss » k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

喜欢 (0)

您必须 登录 才能发表评论!