kubernetes里的event事件生成机制

当要排查集群中某个问题时候,一般通过两种方式来排查问题。一种是查看各个组件日志,确定问题;另一个是通过apiserver获取event事件,根据event事件分析出,某个组件做了什么动作。

event事件机制能够方便我们排查问题,event它是一种api 对象,记录了某个组件在某个时间做了某个动作。各个组件作为客户端,向apiserver发起请求创建或更新event事件,默认apiserver只保存一个小时的event事件。

下面以kubelet 1.18.6版本发起event事件为例,分析event生成流程。

本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库

InvolvedObject:表示这个event是对于那个对象的操作

Reason:表示发起这个动作的原因

Message:这个动作的更详细的描述

Source:表示这个事件的创建者,包含Host(创建者主机名)和Component(创建者的组件名字)

FirstTimestamp:代表第一次发生的时间

LastTimestamp:最后发生的时间

Count:总的发生次数

Type:事件类型,只有两种Warning和Normal

EventTime:第一次发生的事件–为了兼容新的events.k8s.io api添加的

Series:记录相关的事件发生次数和最后发生时间–为了兼容新的events.k8s.io api添加的

Action:采取的动作–为了兼容新的events.k8s.io api添加的

Related:这个事件相关的其他对象–为了兼容新的events.k8s.io api添加的

ReportingController:这个事件的创建者名称–为了兼容新的events.k8s.io api添加的

ReportingInstance:这个事件的创建者实例–为了兼容新的events.k8s.io api添加的

staging\src\k8s.io\api\core\v1\types.go

go

// Event is a report of an event somewhere in the cluster.
type Event struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

	// The object that this event is about.
	InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

	// This should be a short, machine understandable string that gives the reason
	// for the transition into the object's current status.
	// TODO: provide exact specification for format.
	// +optional
	Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

	// A human-readable description of the status of this operation.
	// TODO: decide on maximum length.
	// +optional
	Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

	// The component reporting this event. Should be a short machine understandable string.
	// +optional
	Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

	// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
	// +optional
	FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

	// The time at which the most recent occurrence of this event was recorded.
	// +optional
	LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

	// The number of times this event has occurred.
	// +optional
	Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

	// Type of this event (Normal, Warning), new types could be added in the future
	// +optional
	Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

	// Time when this Event was first observed.
	// +optional
	EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`

	// Data about the Event series this event represents or nil if it's a singleton Event.
	// +optional
	Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

	// What action was taken/failed regarding to the Regarding object.
	// +optional
	Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

	// Optional secondary object for more complex actions.
	// +optional
	Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

	// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
	// +optional
	ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

	// ID of the controller instance, e.g. `kubelet-xyzf`.
	// +optional
	ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}

在kubelet中使用makeEventRecorder来进行event的客户端初始化,为后面发起event做准备。

这里面包含了创建EventBroadcaster、EventRecorder和启动StartLogging和StartRecordingToSink。

EventRecorder是event消息的生产者,它将消息发送给EventBroadcaster中的Broadcaster。

EventBroadcaster是系统的核心,它里面的Broadcaster将收到的event消息发送到各个watcher中。

StartLogging负责创建watcher并将收到event消息记录到日志中。

StartRecordingToSink负责创建watcher并将收到event消息进行聚合后提交给apiserver。

cmd\kubelet\app\server.go

go

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	if err != nil {
		return err
	}
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	// 在cloud provider是aws会被替换成aws的主机域名
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	if err != nil {
		return err
	}
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)

go

// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
	eventBroadcaster := record.NewBroadcaster()
	// 这里的legacyscheme.Scheme,本文件里 import k8s.io/kubernetes/pkg/kubelet/config--(pkg\kubelet\config\common.go里import k8s.io/kubernetes/pkg/apis/core/install--注册所有core group下的api到legacyscheme.Scheme)
	// legacyscheme.Scheme是用来查找event的InvolvedObject的metadata信息
	// Recorder用于生产event
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	// 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
        // 发送event到apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

Broadcaster负责将收到的event发送给各个watcher,它会启动一个goroutine,将incoming里的消息发送给各个watcher的result通道。支持两种处理方式,当watcher的result通道堵塞时,一种是直接放弃继续发送给下一个watcher、另一种是等待这个watcher接收,但是会阻塞下一个watcher接收这个消息。

新建EventBroadcaster

staging\src\k8s.io\client-go\tools\record\event.go

go

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: defaultSleepDuration,
	}
}

broadcaster

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

go

// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, incomingQueueLength),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)
	go m.loop()
	return m
}

// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
	m.lock.Lock()
	defer m.lock.Unlock()
	for _, w := range m.watchers {
		// 关闭watcher 消息接收通道
		close(w.result)
	}
	// Delete everything from the map, since presence/absence in the map is used
	// by stopWatching to avoid double-closing the channel.
	m.watchers = map[int64]*broadcasterWatcher{}
}

// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	// 先处理Type为internal-do-function的event,执行event.Object的func
	// 避免这个Event被watcher消费,导致重复执行
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)
	}
	// 消息处理完,关闭watcher的接收通道,重置清空watchers列表
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	m.lock.Lock()
	defer m.lock.Unlock()
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,则执行default
			// 即w.result卡住或w.stopped也卡住--这个watcher的消息堵住了,那么就放弃发送给这个watcher
			case w.result <- event:
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,那就一直等,直到某个chan不堵塞
			// 即w.result卡住或w.stopped也卡住,这个watcher不会被放弃,一直等到它空闲
			// 但是它会阻塞下一个watcher接收这个消息
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}

eventRecorder用于生产event消息,这里使用了k8s.io/kubernetes/pkg/api/legacyscheme里的Scheme,确定event的InvolvedObject的groupversion信息–当object的groupversionkind为空的时候从scheme里查找已经注册的groupversionkind。

go

kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})

其中生产可以使用以下几种:

支持自定义消息格式Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

直接传递原生消息Event(object runtime.Object, eventtype, reason, message string)

添加annotation且自定义消息格式AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})

staging\src\k8s.io\client-go\tools\record\event.go

go

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
	return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

创建一个watcher,并启动一个gorutine接收event,并记录在日志中。

go

   // 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)

staging\src\k8s.io\client-go\tools\record\event.go

go

// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	// 新建一个watcher,读取事件进行记录
	return e.StartEventWatcher(
		func(e *v1.Event) {
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	// 创建并添加新的watcher
	watcher := e.Watch()
	// 启动goroutine 处理这个watcher的event
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

创建watcher过程

先创建一个添加watcher的event–Type为“internalRunFunctionMarker”,等待这个event消息被消费–上面的loop()会消费这个event执行里面的object,即执行了生成新的watcher,并添加到Broadcaster里的watchers列表。这个watcher能够消费添加之后收到的event,并不能消费之前的收到的event。

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

go

// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Broadcaster) Watch() Interface {
	var w *broadcasterWatcher
	m.blockQueue(func() {
		m.lock.Lock()
		defer m.lock.Unlock()
		id := m.nextWatcher
		m.nextWatcher++
		w = &broadcasterWatcher{
			result:  make(chan Event, m.watchQueueLength),
			stopped: make(chan struct{}),
			id:      id,
			m:       m,
		}
		m.watchers[id] = w
	})
	return w
}

// Execute f, blocking the incoming queue (and waiting for it to drain first).
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func (b *Broadcaster) blockQueue(f func()) {
	var wg sync.WaitGroup
	wg.Add(1)
	// 等待Event发送到incoming
	// 这个Event不会被watcher消费,在loop()中会单独处理这个Type的Event--执行functionFakeRuntimeObject里的func
	b.incoming <- Event{
		Type: internalRunFunctionMarker,
		Object: functionFakeRuntimeObject(func() {
			defer wg.Done()
			f()
		}),
	}
	// 等待f()执行完成
	wg.Wait()
}

为了避免重复event事件频繁的滥发,导致etcd压力大。所以在发送到apiserver之前,在客户端进行聚合,将某个时间段内相同、相似的event进行聚合。

go

if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}

eventCorrelator用来处理收到的event,将event依次发给EventAggregator、EventLogger、spamFilter.Filter进行修改或丢弃。

eventCorrelator包含三个组件EventAggregator、EventLogger、spamFilter.Filter。

EventAggregator

每个event会以aggregateKey(拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason)为key,value为localKeys(包含所有唯一的message集合)和lastTimestamp的struct,保存在lru的cache中。即相同的aggregateKey的event会保存在一个bucket中,event是否唯一根据localKeys中是否有相同的message。

如果localKeys中的message的个数大于等于10个且lastTimestamp在最近10分钟之内,那么这个event的message前面会添加(combined from similar events): ;否则这个event原生的传给EventLogger进行处理。

如果event信息被修改过,则EventLogger的lru cache的key为aggregateKey,否则key为getEventKey返回值–拼合event的Source、involvedObject(包含FieldPath)、Type、reason、message字段。

EventAggregator不会减少event数量,只是当类似的event超过9个时候,会修改event的message。

EventLogger:

LRU cache:key为aggregateKey 或getEventKey返回值; value为已经发现event的count、firstTimestamp、name、resourceVersion

查找event在LRU缓存中发现过几次,如果是已经发现的则将event的count加一并更新lasttimestamp且生成更新event的patch;否则在cache中添加新的item。

EventLogger不会减少event数量,只是当cache中存在会修改event的count和lasttimestamp。

Filter

根据key(source和involvedObject)查找LRU缓存中是否有限速器,如果没有则新建一个,然后判断是否达到限速器的限制,达到了限制event被忽略丢弃,否则执行recordEvent。

recordEvent

执行将上面处理的event发送到apiserver。如果count大于1,则进行patch请求(如果返回404则执行create请求),否则执行create请求。

如果请求apiserver失败,会进行重试,最多重试12次,第一次重试等待随机(1到10秒随机值),默认等待时间为10秒。

staging\src\k8s.io\client-go\tools\record\events_cache.go

go

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy

	e.Lock()
	defer e.Unlock()

	// Check if there is an existing event we should update
	lastObservation := e.lastEventObservationFromCache(key)

	// If we found a result, prepare a patch
	if lastObservation.count > 0 {
		// update the event based on the last observation so patch will work as desired
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
	}

	// record our new observation
	e.cache.Add(
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

// Filter controls that a given source+object are not exceeding the allowed rate.
// 对相同的source和involvedObject的event进行限速
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
	var record spamRecord

	// controls our cached information about this event (source+object)
	eventKey := getSpamKey(event)

	// do we have a record of similar events in our cache?
	f.Lock()
	defer f.Unlock()
	value, found := f.cache.Get(eventKey)
	if found {
		record = value.(spamRecord)
	}

	// verify we have a rate limiter for this record
	if record.rateLimiter == nil {
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
	}

	// ensure we have available rate
	filter := !record.rateLimiter.TryAccept()

	// update the cache
	f.cache.Add(eventKey, record)

	return filter
}

// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
//   the full key for normal events, and to the result of
//   EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	// eventKey is the full cache key for this event
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
	// 默认的keyFunc是EventAggregatorByReasonFunc
	// EventAggregatorByReasonFunc返回的aggregateKey是拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason,localKey是Message。
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
	// find a similar record, its lastTimestamp will be the zero value, so we
	// create a new one in that case.
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	// 现在已经过期时间,清空记录
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
	// 写入localKey,这里利用map特性,如果已经存在,不会保存重复的key
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	// localKeys长度(不同的message的个数)小于maxEvents,包括过期或没有过期,则直接返回newEvent或eventKey
	if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	// 在record没有过期且localKeys长度(不同的message的个数)大于等于maxEvents,则进行聚合--修改最后收到的event
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		// 默认的messageFunc为EventAggregatorByReasonMessageFunc
		// EventAggregatorByReasonMessageFunc返回"(combined from similar events): "加上message
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

其中incoming和result是chan通道,incoming属于Broadcaster,result属于watcher。

  1. EventRecorder生产一个event,并把event发送到EventBroadcaster中的Broadcaster里的incoming chan通道。
  2. StartLogging和StartRecordingToSink会调用StartEventWatcher。
  3. StartEventWatcher会发送一个增加watcher的event,Broadcaster收到这个消息后,会增加一个watcher,启动一个goroutine消费这个event。
  4. 如果是StartLogging发起,那么watcher会将event记录到日志中。
  5. 如果是StartRecordingToSink发起,那么watcher会调用eventCorrelator进行聚合日志,eventCorrelator内部包含aggregator、logger、spamfilter组件。
  6. aggregator负责聚合一个时间段内超过一定数量相似的event并进行修改,如果没有超过则不会修改event,然后将event和aggratorKey传给logger处理
  7. logger会查找缓存中是否有类似的event,如果有则会修改event的count和最后发现事件
  8. spamfiter会查看是否达到了限速器限制,如果达到了限制,就忽略或丢弃这个event消息;未达到限制则发送请求到apiserver。
  9. event中count大于1,则进行patch请求更新event,否则进行create请求。

kubernetes event process

在最近有个pr Migrate kubelet to use v1 Event API,目标是迁移kubelt中使用的event的api到events.k8s.io/v1,相关issue Redesign Event API

为什么要引入这个api?

为了解决现有机制的去重效率不高(重复的event还是能看到–如果10分钟内类似的event没有达到10个且未达到限速器限制)导致apiserver压力大,使用现有api不方便查找需要的信息–(对应用开发者不友好,比如要查找pod全生命周期过程的event)。

events.k8s.io/v1和corev1相互兼容的,即使用corev1创建event,也能通过events.k8s.io/v1获取event,只是events.k8s.io/v1新加字段不会被设置(比如eventTime)。

在events.k8s.io/v1和corev1 event变化对应关系

events.k8s.io/v1corev1 event
Note stringmessage string
Regarding corev1.ObjectReferenceinvolvedObject corev1.ObjectReference
DeprecatedSource corev1.EventSourceSource EventSource
DeprecatedFirstTimestamp metav1.TimeFirstTimestamp metav1.Time
DeprecatedLastTimestamp metav1.TimeLastTimestamp metav1.Time
DeprecatedCount int32Count int32
  1. 在api中添加一个series字段用于表示相似的event出现的次数和最后出现时间
  2. 有一个map缓存保存相似的event,key为action、reason、reportingController、regarding、related,即这5元组一样就认为是相似的event,value为这个serias的最新event。
  3. 每6分钟遍历cache,如果item里的event有series,且在6分钟之前出现,那么就向apiserver发起请求,从cache中删除;如果event没有series且eventtime在6分钟之前,那么从cache中删除。
  4. 每30分钟遍历cache,event有series,则向apiserver发起请求,主要为了防止这个event被etcd删除–默认event保留1个小时。
  5. 启动一个goroutine,在broadcaster添加watcher并处理watcher收到event,如果它在cache中没有,说明是第一个event,将它保存到cache中(这个event没有series),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且没有series,说明是第二个event,则缓存中的event添加series(出现次数为1,最后出现时间为当前),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且有series,则更新cache中event的series(出现次数加1,最后出现时间为当前)
  6. 提交apiserver的方法是patch还是create,取决于要提交的event是否包含series,包含series说明已经存在则使用patch,否则是用create。

Migrate kubelet to use v1 Event API为例

go

    stopCh := make(chan struct{})
	eventBroadcaster := events.NewEventBroadcasterAdapter(kubeDeps.KubeClient)
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(componentKubelet)
	if kubeDeps.EventClient != nil {
		klog.V(4).InfoS("Sending events to api server")
		eventBroadcaster.StartRecordingToSink(stopCh)
	} else {
		klog.InfoS("No api server defined - no events will be sent to API server")
	}

NewEventBroadcasterAdapter返回的eventBroadcasterAdapterImpl包含corev1 event的broadcaster和event v1的broadcaster

如果api支持新的events.k8s.io/v1,只是使用新的broadcaster执行StartRecordingToSink,否则使用corev1 event的StartRecordingToSink。

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

go

// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
// 包含了新老版本的EventBroadcaster
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
	eventClient := &eventBroadcasterAdapterImpl{}
    // 判断api是否支持新的event
	if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
        // 新的方式
		eventClient.eventsv1Client = client.EventsV1()
		eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})
	}
	// Even though there can soon exist cases when coreBroadcaster won't really be needed,
	// we create it unconditionally because its overhead is minor and will simplify using usage
	// patterns of this library in all components.
    // 老的方式
	eventClient.coreClient = client.CoreV1()
	eventClient.coreBroadcaster = record.NewBroadcaster()
	return eventClient
}

// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
	return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}

// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		eventCache:    eventCache,
		sleepDuration: sleepDuration,
		sink:          sink,
	}
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
    // 每30分钟执行更新已经cache中的event有series
	go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
    // 每6分钟执行更新event--在cache中有6分钟之前发现的或event生成已经过了6分钟
	go wait.Until(e.finishSeries, finishTime, stopCh)
    // 启动一个goroutine添加一个watcher,处理接收到的event
	e.startRecordingEvents(stopCh)
}

// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		if event.Series != nil {
			if recordedEvent, retry := recordEvent(e.sink, event); !retry {
				if recordedEvent != nil {
					e.eventCache[isomorphicKey] = recordedEvent
				}
			}
		}
	}
}

// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		eventSerie := event.Series
		if eventSerie != nil {
			if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
				if _, retry := recordEvent(e.sink, event); !retry {
					delete(e.eventCache, isomorphicKey)
				}
			}
		} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
			delete(e.eventCache, isomorphicKey)
		}
	}
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for {
			watchEvent, ok := <-watcher.ResultChan()
			if !ok {
				return
			}
			eventHandler(watchEvent.Object)
		}
	}()
	return watcher.Stop
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
	eventHandler := func(obj runtime.Object) {
		event, ok := obj.(*eventsv1.Event)
		if !ok {
			klog.Errorf("unexpected type, expected eventsv1.Event")
			return
		}
		e.recordToSink(event, clock.RealClock{})
	}
	stopWatcher := e.StartEventWatcher(eventHandler)
	go func() {
		<-stopCh
		stopWatcher()
	}()
}

func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
	// Make a copy before modification, because there could be multiple listeners.
	eventCopy := event.DeepCopy()
	go func() {
		evToRecord := func() *eventsv1.Event {
			e.mu.Lock()
			defer e.mu.Unlock()
			eventKey := getKey(eventCopy)
			isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
			if isIsomorphic {
				if isomorphicEvent.Series != nil {
					isomorphicEvent.Series.Count++
					isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
					return nil
				}
				isomorphicEvent.Series = &eventsv1.EventSeries{
					Count:            1,
					LastObservedTime: metav1.MicroTime{Time: clock.Now()},
				}
				return isomorphicEvent
			}
			e.eventCache[eventKey] = eventCopy
			return eventCopy
		}()
		if evToRecord != nil {
			recordedEvent := e.attemptRecording(evToRecord)
			if recordedEvent != nil {
				recordedEventKey := getKey(recordedEvent)
				e.mu.Lock()
				defer e.mu.Unlock()
				e.eventCache[recordedEventKey] = recordedEvent
			}
		}
	}()
}

如果apiserver支持events.k8s.io/v1,则使用新的EventRecorder,否则使用corev1的EventRecorder。

只支持一种方式生成event–Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})

与老的方式不同的是要多提供一个runtime.Object,设置为Related。

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

go

func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
	if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
		return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
	}
	return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
	hostname, _ := os.Hostname()
	reportingInstance := reportingController + "-" + hostname
	return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}

staging\src\k8s.io\client-go\tools\events\event_recorder.go

go

func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	timestamp := metav1.MicroTime{time.Now()}
	message := fmt.Sprintf(note, args...)
	refRegarding, err := reference.GetReference(recorder.scheme, regarding)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
		return
	}
	refRelated, err := reference.GetReference(recorder.scheme, related)
	if err != nil {
		klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
	}
	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}
	event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
	go func() {
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := refRegarding.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &eventsv1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
			Namespace: namespace,
		},
		EventTime:           timestamp,
		Series:              nil,
		ReportingController: reportingController,
		ReportingInstance:   reportingInstance,
		Action:              action,
		Reason:              reason,
		Regarding:           *refRegarding,
		Related:             refRelated,
		Note:                message,
		Type:                eventtype,
	}
}

staging\src\k8s.io\client-go\tools\record\event.go

go

// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
	return &EventRecorderAdapter{
		recorder: recorder,
	}
}

// Eventf is a wrapper around v1 Eventf
func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}

新的api为什么这么设计,请阅读相关的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/events-redesign.md

老的corev1 event聚合的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/event_compression.md

相关内容