背景
主要功能
K8s里面所有的数据增、删、改、查、WATCH都通过apiserver进行, 为了避免对Etcd的访问压力, k8s里面抽象了一个Cacher的struct,所有etcd事件的分发和访问, 都通过该对象进行, 该对象包装了etcd的client为一个storage在多层store里面共享
功能拆解
- store: 提供对数据的操作接口,比如增删改查和watch
- watcher: 监听store里面数据的变化, 同时做一些处理
注意, watcher和store是一个通用的数据操作抽象, 比如对etcd的操作有一套store和watcher, 上面的缓存也有一套store和watcher, rest那层还有一套store和watch
模式理解
- store: 提供统一接口,屏蔽对底层 数据存储 的操作
- watcher: watcher用于从store获取事件变化,以便进行一些自定义操作的处理
- client: client如果要进行新功能开发,只要起一个新的watcher管住自己关心的数据变化, 完成自定义逻辑处理就可以 如果再说大一点可能就是 数据存储层 、 框架(store) 、 业务逻辑 桑层的解耦了
详细的调用图
代码实现
关键数据结构
- Cacher 负责与后端etcd store进行交互, 分发事件给所有的watcher
type Cacher struct { // 当前incoming队列中的长度性能指标 incomingHWM HighWateMark // incoming 事件管道, 回被分发给所有的watchers incoming chan watchCacheEvent sync.RWMutex // 当前cache ready状态必须为ok才可以被访问 ready *ready // 后端存储数据接口 storage Store // 对象类型 objectType reflect.Type watchCache watchCache reflector *Reflector versioner Versioner triggerFunc TriggerPublisherFunc watcherIdx int watchers indexedWatchers dispatchTimeoutBudge *timeBudget stopLock sync.RWMutex stopped bool stopCh chan struct{} stopWg sync.WaitGroup}复制代码
- cacherWatcher 接收上面Cacher发送事件同时分发给rest websocket接口
// cacheWatcher 实现了watch接口type cacheWatcher struct { sync.Mutex input chan *watchCacheEvent result chan Event done chan struct{} filter filterWithAttrsFunc stopped bool forget func(bool) versioner Versioner}复制代码
- Reflector watch后端数据变换同时把事件发送到watchCache(watch是以store的身份传递到Reflector中)
// Reflector 反射type Reflector struct { name string expectedType reflect.Type store Store listerWatcher ListerWatcher}复制代码
关键方法实现
func NewCacherFromConfig(config Config) *Cacher { // 首先生成一个watchCache watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix stopCh := make(chan struct{}) cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(config.Type), watchCache: watchCache, // watchCache 会被党委store传递到后端的reflector中, 用于reflector获取数据后将etcd中的数据转换成event事件 reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, } // watchCache会设置SetOnEvent方法为cacher的processEvent, 在watchCache的所有事件类型处理的时候都会被改方法进行处理 watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }() return cacher}复制代码
- processEvent 将事件存放到自己的incoming队列中
func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen) } c.incoming <- *event}复制代码
- 事件传递给前端watcher
func (c *Cacher) dispatchEvents() { for { select { case event, ok := <-c.incoming: if !ok { return } c.dispatchEvent(&event) case <-c.stopCh: return } }}func (c *Cacher) dispatchEvent(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) c.Lock() defer c.Unlock() for _, watcher := range c.watchers.allWatchers { watcher.add(event, d.dispatchTimeoutBudge) } if supported { for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { watcher.add(event, d.dispatchTimeoutBudge) } } } else { for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { watcher.add(event, c.dispatchTimeoutBudge) } } }}复制代码
完整代码
package cacherimport ( "context" "fmt" "reflect" "sync" "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/conversion")// HighWateMark 性能type HighWateMark int64// Update 原子更新func (hwm *HighWateMark) Update(current int64) bool { for { old := atomic.LoadInt64((*int64)(hwm)) if current <= old { return false } if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { return true } }}// readytype ready struct { ok bool c *sync.Cond}// newReady 新建func newReady() *ready { return &ready{c: sync.NewCond(&sync.Mutex{})}}// waitfunc (r *ready) wait() { r.c.L.Lock() for !r.ok { r.c.Wait() } r.c.L.Unlock()}// check 返回当前的状态func (r *ready) check() bool { r.c.L.Lock() defer r.c.L.Unlock() return r.ok}// set 修改状态func (r *ready) set(ok bool) { r.c.L.Lock() defer r.c.L.Unlock() r.ok = ok r.c.Broadcast()}// TypeMeta API请求元数据type TypeMeta struct { Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"` APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`}// ListOption 请求的参数type ListOption struct { TypeMeta `json: ",inline"` LabelSelector string FieldSelector string // 是否包含初始化的资源 IncludeUninitialized bool // 使用websocket反馈资源的Add、update、and remove事件通知 Watch bool ResourceVersion string TimeoutSecond *int64 Limit int64}// ListerWatcher 抽象接口type ListerWatcher interface { List(option ListOption) (Object, error) Watch(option ListOption) (Interface, error)}// Reflector 反射type Reflector struct { name string expectedType reflect.Type store Store listerWatcher ListerWatcher}// ListAndWatch 获取最新版本并且watch数据变化func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return nil}// MatchValue 批判type MatchValue struct { IndexName string Value string}// TriggerPublisherFunc 获取匹配的数据type TriggerPublisherFunc func(obj Object) []MatchValue//type filterWithAttrsFunc func(key string, l Set, f Set, uninitializer bool) bool// cacheWatcher 实现了watch接口type cacheWatcher struct { sync.Mutex input chan *watchCacheEvent result chan Event done chan struct{} filter filterWithAttrsFunc stopped bool forget func(bool) versioner Versioner}func (c *cacheWatcher) Stop() { c.forget(true) c.stop()}func (c *cacheWatcher) stop() { c.Lock() defer c.Unlock() if c.stopped { c.stopped = true close(c.done) close(c.input) }}func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan Event, chanSize), done: make(chan struct{}), filter: filter, stopped: false, forget: forget } go watcher.process(initEvents, resourceVersion) return watcher}type watchersMap map[int]*cacheWatcherfunc (wm watchersMap) terminateAll() { for key, watcher := range wm { delete(wm, key) watcher.Stop() }}type indexedWatchers struct { allWatchers watchersMap valueWatchers map[string]watchersMap}func (i *indexedWatchers) terminateAll(objectType reflect.Type) { if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { fmt.Println("Terminating all watchers from cacher %v", objectType) } i.allWatchers.terminateAll() for index, watchers := range i.valueWatchers { watchers.terminateAll() delete(i.valueWatchers, index) }}func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) { if supported { i.valueWatchers[value].deleteWatcher(number) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { i.allWatchers.deleteWatcher(number) }}func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) { if supported { if _, ok := i.valueWatchers[value]; !{ i.valueWatchers[value] = watchersMap{} } i.valueWatchers[value].addWatcher(w, number) } else { i.allWatchers.addWatcher(w, number) }}type timeBudget struct { sync.Mutex budget time.Duration refresh time.Duration maxBudget time.Duration}type Labels interface { Hash(label string) (exists bool) Get(label string) (value string)}type Selector interface { Matchs(Labels) bool Empty() bool String() string RequiresExactMatch(field string) (value string, found bool) DeepCopySelector() Selector}// AttrFunc 获取对象的Label和Field集合type AttrFunc func(obj Object) (Set, Set, bool, error)// SelectPredicate 对象的表现形式type SelectionPredicate struct { Label Selector Fielld Selector IncludeUninitialized bool GetAttrs AttrFunc InedxFields []string Limit int64 Continue string}func (s *SelectionPredicate) MatcherIndex() []MatchValue { var result []MatchValue for , field := range s.InedxFields { if value, ok := s.Fielld.RequiresExactMatch(field); ok { result = append(result, MatchValue{IndexName: field, Value: value}) } } return result}type Feature stringtype FeatureGate interface { Enabled(key Feature) bool}type UID stringtype Preconditions struct { UID *UID}type StatusError struct { ErrStatus metav1.Status}type errWatcher struct { result chan Event}func newErrWatcher(err error) *errWatcher { errEvent := Event{Type: Error} switch err := err.(type) { case Object: errEvent.Object = err case StatusError: errEvent.Object = &err.ErrStatus default: errEvent.Object = &metav1.Status{ Status: metav1.StatusFailure, Message: err.Error(), Reason: metav1.StatusReasonInternalError, Code: http.StatusInternalServerError, } }}type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)// Cacher 缓存type Cacher struct { // 当前incoming队列中的长度性能指标 incomingHWM HighWateMark // incoming 事件管道, 回被分发给所有的watchers incoming chan watchCacheEvent sync.RWMutex // 当前cache ready状态必须为ok才可以被访问 ready *ready // 后端存储数据接口 storage Store // 对象类型 objectType reflect.Type watchCache watchCache reflector *Reflector versioner Versioner triggerFunc TriggerPublisherFunc watcherIdx int watchers indexedWatchers dispatchTimeoutBudge *timeBudget stopLock sync.RWMutex stopped bool stopCh chan struct{} stopWg sync.WaitGroup}func (c *Cacher) Versioner() Versioner { return c.storage.Versioner()}func (c *Cacher) Create(ctx context.Context, ket string, out Object preconditions *Preconditions) { c.storage.Create(ctx, key, out, preconditions)}func (c *Cacher) Delete(ctx context.Context, key string, out Object, preconditions *Preconditions) error { c.storage.Delete(ctx, key, out, preconditions)}func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (Interface, error) { watchRV,, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return nil, err } c.ready.wait() c.watchCache.RLock() defer c.watchCache.RUnlock() initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) if err != nil { return newErrWatcher(err), nil } triggerValue, triggerSupported := "", false if matchValues := pred.MatchIndex(); len(matchValues) > 0 { triggerValue, triggerSupported = matchValues[0].Value, true } chanSize := 10 if c.triggerFunc != nil && !triggerSupported { chanSize = 100 } c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) watcher := newCacheWatcher(watchRv, chanSize, initEvents, filterWithAttrsFunc(key, pred), forget, c.versioner) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ return watcher, nil}func (c *Cacher) WatchList(ctx context.Context, key, string, resourceVersion string, pred SelectionPredicate) (Interface, error) { return c.Watch(ctx, key, resourceVersion, pred)}func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr Object, ignoreNotFound bool) error { if resourceVersion == "" { return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) } getRv, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return nil, err } if getRv == 0 && !c.ready.check() { return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) } c.ready.wait() objValue, err := conversion.EnforcePtr(objPtr) if err != nil { return nil, err } obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRv, key, nil) if err != nil { return nil, err } if exists { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned form storage : %v", obj) } objValue.Set(reflect.ValueOf(elem.Object).Elem()) } else { objValue.Set(reflect.Zero(objValue.Type())) if !ignoreNotFound { return fmt.Errorf("key: %v resourversion: %v", objValue, getRv) } } return nil}func (c *Cacher) List(ctx context.Context, ket string, resourceVersion string, pred SelectionPredicate, listObj Object) error { if resourceVersion == "" { // 实际上这个地方嗨会有判断分页等信息目前胜率 return c.storage.list(ctx, key, resourceVersion, pred, listObj) } listRV, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return err } if listRV ==0 && !c.ready.check() { return c.storage.List(ctx, key, resourceVersion, pred, listObj) } c.ready.wait() listPtr, err := conversion.EnforcePtr(listPtr) if err != nil { return err } listVal, err := conversion.EnforcePtr(listPtr) if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice got %v", listVal.Kind()) } filter := filterWithAttrsFunc(key, pred) objs, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV) if err != nil { return err } if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Fielld.Empty() { // 如果发现对象超过切片的容量就重新生成一个 listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs))) } for _, obj := range objs { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } if filter(elem.Key, elem.Fields, elem.Labels, elem.Uninitialized) { // 反射后续需要学习 listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } if c.versioner != nil { if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err != nil { return err } } return nil}func (c *Cacher) GuaranteedUpdate( ctx context.Context, key string, ptrToType Object, ignoreNotFound bool, preconditions * Preconditions, tryUpdate UpdateFunc, _...Object) error { if elem, exists, err := c.watchCache.GetByKey(key); err != nil { fmt.Printf("GetByKey returned error: %v", err) } else if exists { currObj := elem.(*storeElement).Object.DeepCopyObject() return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj) } return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)}func (c *Cacher) Count(pathPrefix string) (int64, error) { return c.storage.Count(pathPrefix)}func (c *Cacher) triggerValues(event *watchCacheEvent))([]string, bool) { if c.triggerFunc == nil { return nil, false } result := make([]string, 2) matchValues := c.triggerFunc(event.Object) if len(matchValues) > 0 { result = append(result, matchValues[0].Value) } if event.PrevObject == nil { return result, len(result) > 0 } prevMatchValues := c.triggerFunc(event.PrevObject) if len(prevMatchValues) > 0 { if len(result) == 0 || result[0] != prevMatchValues[0].Value { result = append(result, prevMatchValues[0].Value) } } return result, len(result) > 0}func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen) } c.incoming <- *event}func (c *Cacher) dispatchEvents() { for { select { case event, ok := <-c.incoming: if !ok { return } c.dispatchEvent(&event) case <-c.stopCh: return } }}func (c *Cacher) dispatchEvent(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) c.Lock() defer c.Unlock() for _, watcher := range c.watchers.allWatchers { watcher.add(event, d.dispatchTimeoutBudge) } if supported { for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { watcher.add(event, d.dispatchTimeoutBudge) } } } else { for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { watcher.add(event, c.dispatchTimeoutBudge) } } }}func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj Object) error { if resourceVersion == "" || (len(pred.Continue) > 0 || pred.Limit > 0) { // 如果resourceVersion是空就直接从storage里面获取对应的数据 return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) } // 调用versioner解析出list版本 listRv, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return err } // listRv为0并且未完成更新缓存, 从后端直接获取 if listRv == 0 && !c.ready.check() { return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) } c.ready.wait() listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err } listVal, err := conversion.EnforcePtr(listObj) if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a prointer to slice got %v", listVal.Kind()) } filter := filterWithAttrsFunc(key, pred) // 根据上面转换的resource version从后端获取对应的额objs obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRv, key) if err != nil { return err } if exits { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()) } if c.versioner != nil { if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err != nil { return err } } } return nil}func (c *Cacher) startCaching(stopChannel <-chan struct{}) { successfulList := false c.watchCache.SetOnReplace(func() { successfulList = true c.ready.set(true) }) defer func() { if successfulList { c.ready.set(false) } }() c.terminateAllWatchers() if err := c.reflector.ListAndWatch(stopChannel); err != nil { fmt.Errorf("unexpected listAndWatch error: %v", err) }}func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() c.watchers.terminateAll(c.objectType)}func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) { return func(lock bool) { if lock { c.Lock() defer c.Unlock() } else { fmt.Errorf("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) } c.watchers.deleteWatcher(index, triggerValue, triggerSupported) }}复制代码
感悟
其实看了很多, 一层层, 但要完全说明每一层,每一个函数都是怎么转换的帧的太多了, 其实核心就两个Storage和Watcher, Storage的核心是数据变化通知的上层实现,Watcher用于关注数据变化传递响应, 接下来,可能会暂停看apiserver端,先看一下controller和client-go量部分,先这样吧,Good Night