博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
k8s Cache Watcher ListWatch 事件watch内部实现
阅读量:6796 次
发布时间:2019-06-26

本文共 18517 字,大约阅读时间需要 61 分钟。

背景

主要功能

K8s里面所有的数据增、删、改、查、WATCH都通过apiserver进行, 为了避免对Etcd的访问压力, k8s里面抽象了一个Cacher的struct,所有etcd事件的分发和访问, 都通过该对象进行, 该对象包装了etcd的client为一个storage在多层store里面共享

功能拆解

  • store: 提供对数据的操作接口,比如增删改查和watch
  • watcher: 监听store里面数据的变化, 同时做一些处理

注意, watcher和store是一个通用的数据操作抽象, 比如对etcd的操作有一套store和watcher, 上面的缓存也有一套store和watcher, rest那层还有一套store和watch

模式理解

  • store: 提供统一接口,屏蔽对底层 数据存储 的操作
  • watcher: watcher用于从store获取事件变化,以便进行一些自定义操作的处理
  • client: client如果要进行新功能开发,只要起一个新的watcher管住自己关心的数据变化, 完成自定义逻辑处理就可以 如果再说大一点可能就是 数据存储层框架(store)业务逻辑 桑层的解耦了

详细的调用图

代码实现

关键数据结构

  • Cacher 负责与后端etcd store进行交互, 分发事件给所有的watcher
type Cacher struct {	// 当前incoming队列中的长度性能指标	incomingHWM HighWateMark	// incoming 事件管道, 回被分发给所有的watchers	incoming chan watchCacheEvent	sync.RWMutex	// 当前cache ready状态必须为ok才可以被访问	ready *ready	// 后端存储数据接口	storage Store	// 对象类型	objectType reflect.Type	watchCache watchCache	reflector  *Reflector	versioner Versioner	triggerFunc TriggerPublisherFunc	watcherIdx  int	watchers    indexedWatchers	dispatchTimeoutBudge *timeBudget	stopLock sync.RWMutex	stopped  bool	stopCh   chan struct{}	stopWg   sync.WaitGroup}复制代码
  • cacherWatcher 接收上面Cacher发送事件同时分发给rest websocket接口
// cacheWatcher 实现了watch接口type cacheWatcher struct {	sync.Mutex	input     chan *watchCacheEvent	result    chan Event	done      chan struct{}	filter    filterWithAttrsFunc	stopped   bool	forget     func(bool)	versioner Versioner}复制代码
  • Reflector watch后端数据变换同时把事件发送到watchCache(watch是以store的身份传递到Reflector中)
// Reflector 反射type Reflector struct {	name string	expectedType reflect.Type	store Store	listerWatcher ListerWatcher}复制代码

关键方法实现

func NewCacherFromConfig(config Config) *Cacher {    // 首先生成一个watchCache	watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)	reflectorName := "storage/cacher.go:" + config.ResourcePrefix	stopCh := make(chan struct{})	cacher := &Cacher{		ready:       newReady(),		storage:     config.Storage,		objectType:  reflect.TypeOf(config.Type),		watchCache:  watchCache,            // watchCache 会被党委store传递到后端的reflector中, 用于reflector获取数据后将etcd中的数据转换成event事件		reflector:   cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),		versioner:   config.Versioner,	}        // watchCache会设置SetOnEvent方法为cacher的processEvent, 在watchCache的所有事件类型处理的时候都会被改方法进行处理	watchCache.SetOnEvent(cacher.processEvent)	go cacher.dispatchEvents()	cacher.stopWg.Add(1)	go func() {		defer cacher.stopWg.Done()		wait.Until(			func() {				if !cacher.isStopped() {					cacher.startCaching(stopCh)				}			}, time.Second, stopCh,		)	}()	return cacher}复制代码
  • processEvent 将事件存放到自己的incoming队列中
func (c *Cacher) processEvent(event *watchCacheEvent) {	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {		fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)	}	c.incoming <- *event}复制代码
  • 事件传递给前端watcher
func (c *Cacher) dispatchEvents() {	for {		select {		case event, ok := <-c.incoming:			if !ok {				return			}			c.dispatchEvent(&event)		case <-c.stopCh:			return		}	}}func (c *Cacher) dispatchEvent(event *watchCacheEvent) {	triggerValues, supported := c.triggerValues(event)	c.Lock()	defer c.Unlock()	for _, watcher := range c.watchers.allWatchers {		watcher.add(event, d.dispatchTimeoutBudge)	}	if supported {		for _, triggerValue := range triggerValues {			for _, watcher := range c.watchers.valueWatchers[triggerValue] {				watcher.add(event, d.dispatchTimeoutBudge)			}		}	} else {		for _, watchers := range c.watchers.valueWatchers {			for _, watcher := range watchers {				watcher.add(event, c.dispatchTimeoutBudge)			}		}	}}复制代码

完整代码

package cacherimport (	"context"	"fmt"	"reflect"	"sync"	"sync/atomic"	"time"	"k8s.io/apimachinery/pkg/api/meta"	"k8s.io/apimachinery/pkg/conversion")// HighWateMark 性能type HighWateMark int64// Update 原子更新func (hwm *HighWateMark) Update(current int64) bool {	for {		old := atomic.LoadInt64((*int64)(hwm))		if current <= old {			return false		}		if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {			return true		}	}}// readytype ready struct {	ok bool	c  *sync.Cond}// newReady 新建func newReady() *ready {	return &ready{c: sync.NewCond(&sync.Mutex{})}}// waitfunc (r *ready) wait() {	r.c.L.Lock()	for !r.ok {		r.c.Wait()	}	r.c.L.Unlock()}// check 返回当前的状态func (r *ready) check() bool {	r.c.L.Lock()	defer r.c.L.Unlock()	return r.ok}// set 修改状态func (r *ready) set(ok bool) {	r.c.L.Lock()	defer r.c.L.Unlock()	r.ok = ok	r.c.Broadcast()}// TypeMeta API请求元数据type TypeMeta struct {	Kind       string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`	APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`}// ListOption 请求的参数type ListOption struct {	TypeMeta `json: ",inline"`	LabelSelector string	FieldSelector string	// 是否包含初始化的资源	IncludeUninitialized bool	// 使用websocket反馈资源的Add、update、and remove事件通知	Watch           bool	ResourceVersion string	TimeoutSecond   *int64	Limit           int64}// ListerWatcher 抽象接口type ListerWatcher interface {	List(option ListOption) (Object, error)	Watch(option ListOption) (Interface, error)}// Reflector 反射type Reflector struct {	name string	expectedType reflect.Type	store Store	listerWatcher ListerWatcher}// ListAndWatch 获取最新版本并且watch数据变化func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {	return nil}// MatchValue 批判type MatchValue struct {	IndexName string	Value     string}// TriggerPublisherFunc 获取匹配的数据type TriggerPublisherFunc func(obj Object) []MatchValue//type filterWithAttrsFunc func(key string, l Set, f Set, uninitializer bool) bool// cacheWatcher 实现了watch接口type cacheWatcher struct {	sync.Mutex	input     chan *watchCacheEvent	result    chan Event	done      chan struct{}	filter    filterWithAttrsFunc	stopped   bool	forget     func(bool)	versioner Versioner}func (c *cacheWatcher) Stop() {	c.forget(true)	c.stop()}func (c *cacheWatcher) stop() {	c.Lock()	defer c.Unlock()	if c.stopped {		c.stopped = true		close(c.done)		close(c.input)	}}func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {	watcher := &cacheWatcher{		input: 		make(chan *watchCacheEvent, chanSize),		result: 	make(chan Event, chanSize),		done: 		make(chan struct{}),		filter: 	filter,		stopped: 	false,		forget: 	forget	}	go watcher.process(initEvents, resourceVersion)	return watcher}type watchersMap map[int]*cacheWatcherfunc (wm watchersMap) terminateAll() {	for key, watcher := range wm {		delete(wm, key)		watcher.Stop()	}}type indexedWatchers struct {	allWatchers   watchersMap	valueWatchers map[string]watchersMap}func (i *indexedWatchers) terminateAll(objectType reflect.Type) {	if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {		fmt.Println("Terminating all watchers from cacher %v", objectType)	}	i.allWatchers.terminateAll()	for index, watchers := range i.valueWatchers {		watchers.terminateAll()		delete(i.valueWatchers, index)	}}func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {	if supported {		i.valueWatchers[value].deleteWatcher(number)		if len(i.valueWatchers[value]) == 0 {			delete(i.valueWatchers, value)		}	} else {		i.allWatchers.deleteWatcher(number)	}}func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {	if supported {		if _, ok := i.valueWatchers[value]; !{			i.valueWatchers[value] = watchersMap{}		}		i.valueWatchers[value].addWatcher(w, number)	} else {		i.allWatchers.addWatcher(w, number)	}}type timeBudget struct {	sync.Mutex	budget time.Duration	refresh   time.Duration	maxBudget time.Duration}type Labels interface {	Hash(label string) (exists bool)	Get(label string) (value string)}type Selector interface {	Matchs(Labels) bool	Empty() bool	String() string	RequiresExactMatch(field string) (value string, found bool)	DeepCopySelector() Selector}// AttrFunc 获取对象的Label和Field集合type AttrFunc func(obj Object) (Set, Set, bool, error)// SelectPredicate  对象的表现形式type SelectionPredicate struct {	Label                Selector	Fielld               Selector	IncludeUninitialized bool	GetAttrs             AttrFunc	InedxFields          []string	Limit                int64	Continue             string}func (s *SelectionPredicate) MatcherIndex() []MatchValue {	var result []MatchValue	for , field := range s.InedxFields {		if value, ok := s.Fielld.RequiresExactMatch(field); ok {			result = append(result, MatchValue{IndexName: field, Value: value})		}	}	return result}type Feature stringtype FeatureGate interface {	Enabled(key Feature) bool}type UID stringtype Preconditions struct {	UID *UID}type StatusError struct {	ErrStatus metav1.Status}type errWatcher struct {	result chan Event}func newErrWatcher(err error) *errWatcher {	errEvent := Event{Type: Error}	switch err := err.(type) {	case Object:		errEvent.Object = err	case StatusError:		errEvent.Object = &err.ErrStatus	default:		errEvent.Object = &metav1.Status{			Status:  metav1.StatusFailure,			Message: err.Error(),			Reason:  metav1.StatusReasonInternalError,			Code:    http.StatusInternalServerError,		}	}}type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)// Cacher 缓存type Cacher struct {	// 当前incoming队列中的长度性能指标	incomingHWM HighWateMark	// incoming 事件管道, 回被分发给所有的watchers	incoming chan watchCacheEvent	sync.RWMutex	// 当前cache ready状态必须为ok才可以被访问	ready *ready	// 后端存储数据接口	storage Store	// 对象类型	objectType reflect.Type	watchCache watchCache	reflector  *Reflector	versioner Versioner	triggerFunc TriggerPublisherFunc	watcherIdx  int	watchers    indexedWatchers	dispatchTimeoutBudge *timeBudget	stopLock sync.RWMutex	stopped  bool	stopCh   chan struct{}	stopWg   sync.WaitGroup}func (c *Cacher) Versioner() Versioner {	return c.storage.Versioner()}func (c *Cacher) Create(ctx context.Context, ket string, out Object preconditions *Preconditions) {	c.storage.Create(ctx, key, out, preconditions)}func (c *Cacher) Delete(ctx context.Context, key string, out Object, preconditions *Preconditions) error {	c.storage.Delete(ctx, key, out, preconditions)}func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (Interface, error) {	watchRV,, err := c.versioner.ParseResourceVersion(resourceVersion)	if err != nil {		return nil, err	}		c.ready.wait()	c.watchCache.RLock()	defer c.watchCache.RUnlock()	initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)	if err != nil {		return newErrWatcher(err), nil	}	triggerValue, triggerSupported := "", false	if matchValues := pred.MatchIndex(); len(matchValues) > 0 {		triggerValue, triggerSupported = matchValues[0].Value, true	}	chanSize := 10	if c.triggerFunc != nil && !triggerSupported {		chanSize = 100	}	c.Lock()	defer c.Unlock()	forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)	watcher := newCacheWatcher(watchRv, chanSize, initEvents, filterWithAttrsFunc(key, pred), forget, c.versioner)	c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)	c.watcherIdx++	return watcher, nil}func (c *Cacher) WatchList(ctx context.Context, key, string, resourceVersion string, pred SelectionPredicate) (Interface, error) {	return c.Watch(ctx, key, resourceVersion, pred)}func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr Object, ignoreNotFound bool) error {	if resourceVersion == "" {		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)	}	getRv, err := c.versioner.ParseResourceVersion(resourceVersion)	if err != nil {		return nil, err	}	if getRv == 0 && !c.ready.check() {		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)	}	c.ready.wait()	objValue, err := conversion.EnforcePtr(objPtr)	if err != nil {		return nil, err	}	obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRv, key, nil)	if err != nil {		return nil, err	}	if exists {		elem, ok := obj.(*storeElement)		if !ok {			return fmt.Errorf("non *storeElement returned form storage : %v", obj)		}		objValue.Set(reflect.ValueOf(elem.Object).Elem())	} else {		objValue.Set(reflect.Zero(objValue.Type()))		if !ignoreNotFound {			return fmt.Errorf("key: %v resourversion: %v", objValue, getRv)		}	}	return nil}func (c *Cacher) List(ctx context.Context, ket string, resourceVersion string, pred SelectionPredicate, listObj Object) error {	if resourceVersion == "" {		// 实际上这个地方嗨会有判断分页等信息目前胜率 		return c.storage.list(ctx, key, resourceVersion, pred, listObj)	}	listRV, err := c.versioner.ParseResourceVersion(resourceVersion)	if err != nil {		return err	}	if listRV ==0 && !c.ready.check() {		return c.storage.List(ctx, key, resourceVersion, pred, listObj)	} 	c.ready.wait()	listPtr, err := conversion.EnforcePtr(listPtr)	if err != nil {		return err	}	listVal, err := conversion.EnforcePtr(listPtr)	if err != nil || listVal.Kind() != reflect.Slice {		return fmt.Errorf("need a pointer to slice got %v", listVal.Kind())	}	filter := filterWithAttrsFunc(key, pred)	objs, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV)	if err != nil {		return err	}	if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Fielld.Empty() {		// 如果发现对象超过切片的容量就重新生成一个		listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))	}	for _, obj := range objs {		elem, ok := obj.(*storeElement)		if !ok {			return fmt.Errorf("non *storeElement returned from storage: %v", obj)		}		if filter(elem.Key, elem.Fields, elem.Labels, elem.Uninitialized) {			// 反射后续需要学习			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))		}	}	if c.versioner != nil {		if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err != nil {			return err		}	}	return nil}func (c *Cacher) GuaranteedUpdate(	ctx context.Context, key string, ptrToType Object, ignoreNotFound bool,	preconditions * Preconditions, tryUpdate UpdateFunc, _...Object) error {		if elem, exists, err := c.watchCache.GetByKey(key); err != nil {			fmt.Printf("GetByKey returned error: %v", err)		} else if exists {			currObj := elem.(*storeElement).Object.DeepCopyObject()			return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)		}		return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)}func (c *Cacher) Count(pathPrefix string) (int64, error) {	return c.storage.Count(pathPrefix)}func (c *Cacher) triggerValues(event *watchCacheEvent))([]string, bool) {	if c.triggerFunc == nil {		return nil, false	}	result := make([]string, 2)	matchValues := c.triggerFunc(event.Object)	if len(matchValues) > 0 {		result = append(result, matchValues[0].Value)	}	if event.PrevObject == nil {		return result, len(result) > 0	}	prevMatchValues := c.triggerFunc(event.PrevObject)	if len(prevMatchValues) > 0 {		if len(result) == 0 || result[0] != prevMatchValues[0].Value {			result = append(result, prevMatchValues[0].Value)		}	}	return result, len(result) > 0}func (c *Cacher) processEvent(event *watchCacheEvent) {	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {		fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)	}	c.incoming <- *event}func (c *Cacher) dispatchEvents() {	for {		select {		case event, ok := <-c.incoming:			if !ok {				return			}			c.dispatchEvent(&event)		case <-c.stopCh:			return		}	}}func (c *Cacher) dispatchEvent(event *watchCacheEvent) {	triggerValues, supported := c.triggerValues(event)	c.Lock()	defer c.Unlock()	for _, watcher := range c.watchers.allWatchers {		watcher.add(event, d.dispatchTimeoutBudge)	}	if supported {		for _, triggerValue := range triggerValues {			for _, watcher := range c.watchers.valueWatchers[triggerValue] {				watcher.add(event, d.dispatchTimeoutBudge)			}		}	} else {		for _, watchers := range c.watchers.valueWatchers {			for _, watcher := range watchers {				watcher.add(event, c.dispatchTimeoutBudge)			}		}	}}func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj Object) error {	if resourceVersion == "" || (len(pred.Continue) > 0 || pred.Limit > 0) {		// 如果resourceVersion是空就直接从storage里面获取对应的数据		return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)	}	// 调用versioner解析出list版本	listRv, err := c.versioner.ParseResourceVersion(resourceVersion)	if err != nil {		return err	}	// listRv为0并且未完成更新缓存, 从后端直接获取	if listRv == 0 && !c.ready.check() {		return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)	}	c.ready.wait()	listPtr, err := meta.GetItemsPtr(listObj)	if err != nil {		return err	}	listVal, err := conversion.EnforcePtr(listObj)	if err != nil || listVal.Kind() != reflect.Slice {		return fmt.Errorf("need a prointer to slice got %v", listVal.Kind())	}	filter := filterWithAttrsFunc(key, pred)	// 根据上面转换的resource version从后端获取对应的额objs	obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRv, key)	if err != nil {		return err	}	if exits {		elem, ok := obj.(*storeElement)		if !ok {			return fmt.Errorf("non *storeElement returned from storage: %v", obj)		}		if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())		}		if c.versioner != nil {			if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err != nil {				return err			}		}	}	return nil}func (c *Cacher) startCaching(stopChannel <-chan struct{}) {	successfulList := false	c.watchCache.SetOnReplace(func() {		successfulList = true		c.ready.set(true)	})	defer func() {		if successfulList {			c.ready.set(false)		}	}()	c.terminateAllWatchers()	if err := c.reflector.ListAndWatch(stopChannel); err != nil {		fmt.Errorf("unexpected listAndWatch error: %v", err)	}}func (c *Cacher) terminateAllWatchers() {	c.Lock()	defer c.Unlock()	c.watchers.terminateAll(c.objectType)}func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {	return func(lock bool) {		if lock {			c.Lock()			defer c.Unlock()		} else {			fmt.Errorf("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())		}		c.watchers.deleteWatcher(index, triggerValue, triggerSupported)	}}复制代码

感悟

其实看了很多, 一层层, 但要完全说明每一层,每一个函数都是怎么转换的帧的太多了, 其实核心就两个Storage和Watcher, Storage的核心是数据变化通知的上层实现,Watcher用于关注数据变化传递响应, 接下来,可能会暂停看apiserver端,先看一下controller和client-go量部分,先这样吧,Good Night

转载于:https://juejin.im/post/5b798a596fb9a01a031aeea0

你可能感兴趣的文章
byte_of_python中的备份脚本
查看>>
今日小结 4.16
查看>>
05 配置优化器
查看>>
输入的整数反方向输出
查看>>
[ Nowcoder Contest 167 #C ] 部分和
查看>>
MFC 中CFileDialog的用法
查看>>
关于SVM一篇比较全介绍的博文
查看>>
English - because of,due to ,thanks to ,owing to ,as a result of ,on account of解析
查看>>
全球免费开放的电子图书馆
查看>>
27_Blog Reader
查看>>
个人代码库のC#可移动按钮“相关代码”
查看>>
MyBatis配置项--配置环境(environments)--databaseIdProvider
查看>>
类、对象、方法、实例方法、类方法
查看>>
《CLR via C#》读书笔记 之 目录导航
查看>>
51Nod 1009:1009 数字1的数量 (思维)
查看>>
Spring下载地址
查看>>
SQL性能优化总结
查看>>
WinHex数据恢复笔记(二)
查看>>
c#设计模式系列:观察者模式(Observer Pattern)
查看>>
NO23 Linux正则表达式结合三剑客企业级实践--取IP
查看>>