? 优质资源分享 ?
| 学习路线指引(点击解锁) | 知识定位 | 人群定位 | 
|---|---|---|
| ? Python实战微信订餐小程序 ? | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 | 
| ?Python量化交易实战? | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 | 
之前了解了client-go中的架构设计,也就是 tools/cache 下面的一些概念,那么下面将对informer进行分析
Controller
在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。code
Config
从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。
type Config struct {
    Queue // DeltaFIFO
    ListerWatcher // 用于list watch的
    Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
    ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
    FullResyncPeriod time.Duration // 全量同步的周期
    ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
    RetryOnError bool
}
controller
然后 controller 又为 reflertor 的上层
type controller struct {
    config         Config
    reflector      *Reflector 
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}
type Controller interface {
    // controller 主要做两件事,
    // 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
    // 2. Queue用Pop()弹出数据,具体的操作是Process
    // 直到 stopCh 不阻塞,这两个协程将退出
    Run(stopCh <-chan struct{})
    HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
    LastSyncResourceVersion() string
}
controller 中的方法,仅有一个 Run() 和 New();这意味着,controller 只是一个抽象的概念,作为 Reflector, Delta FIFO 整合的工作流
而 controller 则是 SharedInformer 了。
Queue
这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 Store , Store 是不具备弹出功能的。
type Queue interface {
    Store
    // Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器
    Pop(PopProcessFunc) (interface{}, error)
    // AddIfNotPresent puts the given accumulator into the Queue (in
    // association with the accumulator's key) if and only if that key
    // is not already associated with a non-empty accumulator.
    AddIfNotPresent(interface{}) error
    // HasSynced returns true if the first batch of keys have all been
    // popped. The first batch of keys are those of the first Replace
    // operation if that happened before any Add, Update, or Delete;
    // otherwise the first batch is empty.
    HasSynced() bool
    Close() // 关闭queue
}
而弹出的操作是通过 controller 中的 processLoop() 进行的,最终走到Delta FIFO中进行处理。
通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }
            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item) // 进行处理
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
            err = e.Err 
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}
Informer
通过对 Reflector, Store, Queue, ListerWatcher、ProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer
func newInformer(
 lw ListerWatcher,
 objType runtime.Object,
 resyncPeriod time.Duration,
 h ResourceEventHandler,
 clientState Store,
) Controller {
    // This will hold incoming changes. Note how we pass clientState in as a
    // KeyLister, that way resync operations will result in the correct set
    // of update/delete deltas.
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          clientState,
        EmitDeltaTypeReplaced: true,
    })
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    lw,
        ObjectType:       objType,
        FullResyncPeriod: resyncPeriod,
        RetryOnError:     false,
        Process: func(obj interface{}) error {
            // from oldest to newest
            for _, d := range obj.(Deltas) {
                switch d.Type {
                case Sync, Replaced, Added, Updated:
                    if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                        if err := clientState.Update(d.Object); err != nil {
                            return err
                        }
                        h.OnUpdate(old, d.Object)
                    } else {
                        if err := clientState.Add(d.Object); err != nil {
                            return err
                        }
                        h.OnAdd(d.Object)
                    }
                case Deleted:
                    if err := clientState.Delete(d.Object); err != nil {
                        return err
                    }
                    h.OnDelete(d.Object)
                }
            }
            return nil
        },
    }
    return New(cfg)
}
newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFunc, Store 等 controller的概念。最终对外的方法为 NewInformer()
func NewInformer(
 lw ListerWatcher,
 objType runtime.Object,
 resyncPeriod time.Duration,
 h ResourceEventHandler,
) (Store, Controller) {
    // This will hold the client state, as we know it.
    clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
    return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}
可以看到 NewInformer() 就是一个带有 Store功能的controller,通过这些可以假定出,Informer 就是controller ,将queue中相关操作分发给不同事件处理的功能
shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer  则是对shareInformer 的扩展
type SharedInformer interface {
    // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
    // period. Events to a single handler are delivered sequentially, but there is no coordination
    // between different handlers.
    AddEventHandler(handler ResourceEventHandler)
    // AddEventHandlerWithResyncPeriod adds an event handler to the
    // shared informer with the requested resync period; zero means
    // this handler does not care about resyncs. The resync operation
    // consists of delivering to the handler an update notification
    // for every object in the informer's local cache; it does not add
    // any interactions with the authoritative storage. Some
    // informers do no resyncs at all, not even for handlers added
    // with a non-zero resyncPeriod. For an informer that does
    // resyncs, and for each handler that requests resyncs, that
    // informer develops a nominal resync period that is no shorter
    // than the requested period but may be longer. The actual time
    // between any two resyncs may be longer than the nominal period
    // because the implementation takes time to do work and there may
    // be competing load and scheduling noise.
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // GetStore returns the informer's local cache as a Store.
    GetStore() Store
    // GetController is deprecated, it does nothing useful
    GetController() Controller
    // Run starts and runs the shared informer, returning after it stops.
    // The informer will be stopped when stopCh is closed.
    Run(stopCh <-chan struct{})
    // HasSynced returns true if the shared informer's store has been
    // informed by at least one full LIST of the authoritative state
    // of the informer's object collection. This is unrelated to "resync".
    HasSynced() bool
    // LastSyncResourceVersion is the resource version observed when last synced with the underlying
    // store. The value returned is not synchronized with access to the underlying store and is not
    // thread-safe.
    LastSyncResourceVersion() string
}
SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:
- 索引本地缓存
- controller,通过list watch拉取API并推入 Deltal FIFO
- 事件的处理
type sharedIndexInformer struct {
    indexer    Indexer // 具有索引的本地缓存
    controller Controller // controller
    processor             *sharedProcessor // 事件处理函数集合
    cacheMutationDetector MutationDetector
    listerWatcher ListerWatcher
    objectType runtime.Object
    resyncCheckPeriod time.Duration
    defaultEventHandlerResyncPeriod time.Duration
    clock clock.Clock
    started, stopped bool
    startedLock      sync.Mutex
    blockDeltas sync.Mutex
}
而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          s.indexer,
        EmitDeltaTypeReplaced: true,
    })
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
        Process: s.HandleDeltas, // process 弹出时操作的流程
    }
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
}
而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                isSync := false
                switch {
                case d.Type == Sync:
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                // 事件的分发
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                // 事件的分发
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}
事件处理函数 processor
启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数。
run() 函数会启动两个 listener,j监听事件处理业务函数 listener.run 和 事件的处理
wg.StartWithChannel(processorStopCh, s.processor.run)
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run) 
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}
可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理
func (p *processorListener) run() {
    stopCh := make(chan struct{})
    wait.Until(func() {
        for next := range p.nextCh { // 消费事件
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}
informer中的事件的设计
了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener
事件的添加
当在handleDelta时,会分发具体的事件
// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
此时,事件泵 Pop() 会根据接收到的事件进行处理
// run() 时会启动一个事件泵
p.wg.Start(listener.pop)
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) 
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification: // 这里实际上是一个阻塞的等待
            // 单向channel 可能不会走到这步骤
            var ok bool
            // deltahandle 中 distribute 会将事件添加到addCh待处理事件中
            // 处理完事件会再次拿到一个事件
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        // 处理 分发过来的事件 addCh
        case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
            if !ok {
                return
            }
            // 这里代表第一次,没有任何事件时,或者上面步骤完成读取
            if notification == nil { // 就会走这里
                notification = notificationToAdd 
                nextCh = p.nextCh 
            } else { 
                // notification否则代表没有处理完,将数据再次添加到待处理中
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
该消息事件的流程图为
通过一个简单实例来学习client-go中的消息通知机制
package main
import (
    "fmt"
    "time"
    "k8s.io/utils/buffer"
)
var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2)
func main() {
    // pop
    go func() {
        var nextCh chan<- interface{}
        var notification interface{}
        //var n int
        for {
            fmt.Println("busy wait")
            fmt.Println("entry select", notification)
            select {
            // 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
            case nextCh <- notification:
                fmt.Println("entry nextCh", notification)
                var ok bool
                // 读不到数据代表已处理完,置空锁
                notification, ok = pendding.ReadOne()
                if !ok {
                    fmt.Println("unactive nextch")
                    nextCh = nil
                }
            // 事件的分发,监听,初始时也是一个阻塞
            case notificationToAdd, ok := <-addCh:
                fmt.Println(notificationToAdd, notification)
                if !ok {
                    return
                }
                // 线程安全
                // 当消息为空时,没有被处理
                // 锁为空,就分发数据
                if notification == nil {
                    fmt.Println("frist notification nil")
                    notification = notificationToAdd
                    nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
                } else {
                    // 在第三次时,会走到这里,数据进入环
                    fmt.Println("into ring", notificationToAdd)
                    pendding.WriteOne(notificationToAdd)
                }
            }
        }
    }()
    // producer
    go func() {
        i := 0
        for {
            i++
            if i%5 == 0 {
                addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
                time.Sleep(time.Millisecond * 9000)
            } else {
                addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
                time.Sleep(time.Millisecond * 500)
            }
        }
    }()
    // subsriber
    go func() {
        for {
            for next := range nextCh1 {
                time.Sleep(time.Millisecond * 300)
                fmt.Println("consumer", next)
            }
        }
    }()
    <-stopper
}
总结,这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextCh,notification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextCh 和 nextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法
转载请注明:xuhss » 浅析kubernetes中client-go Informer

