Event Generation Mechanism in Kubernetes

When troubleshooting an issue in a cluster, there are generally two ways to investigate problems. One is to check the logs of various components to identify the issue, while the other is to retrieve event events through the API server and analyze what actions a component has taken based on those events.

The event event mechanism makes it easy for us to troubleshoot problems. An event is an API object that records what action a component took at a certain time. Each component acts as a client, sending requests to the API server to create or update event events. By default, the API server retains event events for one hour.

In the following example, we’ll analyze the process of event generation initiated by kubelet in version 1.18.6.

This article is based on kubernetes version 1.18.6, please visit source code reading

staging\src\k8s.io\api\core\v1\types.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Event is a report of an event somewhere in the cluster.
type Event struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

	// The object that this event is about.
	InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

	// This should be a short, machine understandable string that gives the reason
	// for the transition into the object's current status.
	// TODO: provide exact specification for format.
	// +optional
	Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

	// A human-readable description of the status of this operation.
	// TODO: decide on maximum length.
	// +optional
	Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

	// The component reporting this event. Should be a short machine understandable string.
	// +optional
	Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

	// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
	// +optional
	FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

	// The time at which the most recent occurrence of this event was recorded.
	// +optional
	LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

	// The number of times this event has occurred.
	// +optional
	Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

	// Type of this event (Normal, Warning), new types could be added in the future
	// +optional
	Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

	// Time when this Event was first observed.
	// +optional
	EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`

	// Data about the Event series this event represents or nil if it's a singleton Event.
	// +optional
	Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

	// What action was taken/failed regarding to the Regarding object.
	// +optional
	Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

	// Optional secondary object for more complex actions.
	// +optional
	Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

	// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
	// +optional
	ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

	// ID of the controller instance, e.g. `kubelet-xyzf`.
	// +optional
	ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}

In the kubelet, event initialization is done using makeEventRecorder to set up the client for handling events. This prepares the kubelet for later event creation.

This initialization process involves creating an EventBroadcaster, an EventRecorder, and starting StartLogging and StartRecordingToSink.

  • EventRecorder is responsible for producing event messages, and it sends these messages to the Broadcaster within the EventBroadcaster.
  • EventBroadcaster is a core component of the system. The Broadcaster inside it sends received event messages to various watchers.
  • StartLogging is responsible for creating a watcher and logging the received event messages to the log.
  • StartRecordingToSink is responsible for creating a watcher and aggregating received event messages before submitting them to the API server.

This initialization process sets up the kubelet to handle events effectively.

cmd\kubelet\app\server.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	if err != nil {
		return err
	}
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	// 在cloud provider是aws会被替换成aws的主机域名
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	if err != nil {
		return err
	}
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
	eventBroadcaster := record.NewBroadcaster()
	// 这里的legacyscheme.Scheme,本文件里 import k8s.io/kubernetes/pkg/kubelet/config--(pkg\kubelet\config\common.go里import k8s.io/kubernetes/pkg/apis/core/install--注册所有core group下的api到legacyscheme.Scheme)
	// legacyscheme.Scheme是用来查找event的InvolvedObject的metadata信息
	// Recorder用于生产event
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	// 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
        // 发送event到apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

The Broadcaster in the EventBroadcaster is responsible for sending received events to various watchers. It operates by starting a goroutine that sends messages from the “incoming” channel to the “result” channels of different watchers.

There are two handling options for when a watcher’s “result” channel is blocked:

  1. The broadcaster can choose to give up and continue sending the event to the next watcher.
  2. The broadcaster can choose to wait until the blocked watcher is able to receive the event, but this will block the delivery to the next watcher.

These two options allow for flexibility in handling events and ensure that event delivery is not entirely blocked if one watcher is slow to process events.

NewEventBroadcaster

staging\src\k8s.io\client-go\tools\record\event.go

1
2
3
4
5
6
7
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: defaultSleepDuration,
	}
}

broadcaster

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, incomingQueueLength),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)
	go m.loop()
	return m
}

// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
	m.lock.Lock()
	defer m.lock.Unlock()
	for _, w := range m.watchers {
		// 关闭watcher 消息接收通道
		close(w.result)
	}
	// Delete everything from the map, since presence/absence in the map is used
	// by stopWatching to avoid double-closing the channel.
	m.watchers = map[int64]*broadcasterWatcher{}
}

// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	// 先处理Type为internal-do-function的event,执行event.Object的func
	// 避免这个Event被watcher消费,导致重复执行
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)
	}
	// 消息处理完,关闭watcher的接收通道,重置清空watchers列表
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	m.lock.Lock()
	defer m.lock.Unlock()
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,则执行default
			// 即w.result卡住或w.stopped也卡住--这个watcher的消息堵住了,那么就放弃发送给这个watcher
			case w.result <- event:
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,那就一直等,直到某个chan不堵塞
			// 即w.result卡住或w.stopped也卡住,这个watcher不会被放弃,一直等到它空闲
			// 但是它会阻塞下一个watcher接收这个消息
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}

The EventRecorder is used to produce event messages. It uses the k8s.io/kubernetes/pkg/api/legacyscheme’s Scheme to determine the group version information for the InvolvedObject of the event. When the groupVersionKind of the object is empty, it looks up the registered groupVersionKind from the scheme.

1
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})

Here are the supported methods for producing events:

  1. Custom message format with Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}).
  2. Directly passing a native message with Event(object runtime.Object, eventtype, reason, message string).
  3. Adding annotations and custom message format with AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}).

These methods provide flexibility in generating event messages with different formats and content.

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
	return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

The StartLogging function creates a watcher and starts a goroutine to receive events and log them. Here’s an overview of how it works:

  1. Create a new event watcher using the NewWatcher function. This watcher will be responsible for receiving events.
  2. Start a goroutine that listens for events from the watcher. The goroutine runs an infinite loop and waits for incoming events.
  3. When an event is received, it is logged using the Kubernetes logger with the event’s details, such as the type, reason, message, and involved object.
  4. The loop continues to listen for more events, ensuring that events are continuously logged as they occur.

In summary, the StartLogging function sets up a mechanism to log events as they are received, allowing administrators and developers to monitor the behavior of the Kubernetes system and diagnose any issues.

1
2
   // 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	// 新建一个watcher,读取事件进行记录
	return e.StartEventWatcher(
		func(e *v1.Event) {
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	// 创建并添加新的watcher
	watcher := e.Watch()
	// 启动goroutine 处理这个watcher的event
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

Process of Creating a Watcher

The process begins by creating an event that triggers the addition of a watcher. This event is of type “internalRunFunctionMarker.” The system waits for this event message to be consumed.

The loop() function, mentioned earlier, consumes this event and executes the object contained within it. This execution leads to the creation of a new watcher. The newly created watcher is then added to the list of watchers maintained within the Broadcaster.

This watcher is designed to handle events received after its creation, meaning it can process events that occur after it has been added to the Broadcaster’s list. However, it cannot process events that were received prior to its creation.

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Broadcaster) Watch() Interface {
	var w *broadcasterWatcher
	m.blockQueue(func() {
		m.lock.Lock()
		defer m.lock.Unlock()
		id := m.nextWatcher
		m.nextWatcher++
		w = &broadcasterWatcher{
			result:  make(chan Event, m.watchQueueLength),
			stopped: make(chan struct{}),
			id:      id,
			m:       m,
		}
		m.watchers[id] = w
	})
	return w
}

// Execute f, blocking the incoming queue (and waiting for it to drain first).
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func (b *Broadcaster) blockQueue(f func()) {
	var wg sync.WaitGroup
	wg.Add(1)
	// 等待Event发送到incoming
	// 这个Event不会被watcher消费,在loop()中会单独处理这个Type的Event--执行functionFakeRuntimeObject里的func
	b.incoming <- Event{
		Type: internalRunFunctionMarker,
		Object: functionFakeRuntimeObject(func() {
			defer wg.Done()
			f()
		}),
	}
	// 等待f()执行完成
	wg.Wait()
}

To prevent the excessive generation of duplicate events and alleviate the burden on etcd, event aggregation is performed on the client side before sending events to the API server. This aggregation groups events that are similar or identical within a certain time window, reducing the number of events sent to the server.

1
2
3
4
5
6
if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}

The eventCorrelator is responsible for handling incoming events and passing them through three components: EventAggregator, EventLogger, and spamFilter.Filter.

EventAggregator

  • Events are aggregated based on an aggregateKey, which is constructed by concatenating Source, InvolvedObject (excluding FieldPath), Type, and Reason.
  • These aggregated events are stored in an LRU cache along with localKeys (containing unique message sets) and lastTimestamp.
  • If there are 10 or more messages in localKeys and the lastTimestamp is within the last 10 minutes, the event’s message will be modified to include (combined from similar events):. Otherwise, the event is sent to the EventLogger as is.
  • If event information has been modified, the cache key used for EventLogger is the aggregateKey. Otherwise, it’s determined using the getEventKey function, which combines event attributes (Source, involvedObject, Type, Reason, message).

Note: The EventAggregator does not reduce the number of events but modifies the message when similar events are detected.

EventLogger:

  • Uses an LRU cache where keys are either aggregateKey or values returned by the getEventKey function.
  • Checks how many times an event has been found in the LRU cache.
  • If the event is already present, it increments the count and updates the lastTimestamp, generating a patch to update the event.
  • If the event is not found in the cache, a new item is added.

Note: Similar to the EventAggregator, the EventLogger does not reduce the number of events but updates their count and lastTimestamp if they’re already in the cache.

Filter

  • Looks up an LRU cache based on a key composed of Source and InvolvedObject.
  • If no throttle limiter is found in the cache, a new one is created.
  • The throttle limiter checks if the rate limit has been reached. If so, the event is ignored and discarded. Otherwise, recordEvent is executed.

recordEvent

  • Sends the processed event to the API server.
  • If the count is greater than 1, it sends a patch request (or create request if a 404 response is received).
  • If the API server request fails, it retries up to 12 times. The first retry waits for a random time between 1 to 10 seconds (with a default of 10 seconds).

staging\src\k8s.io\client-go\tools\record\events_cache.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy

	e.Lock()
	defer e.Unlock()

	// Check if there is an existing event we should update
	lastObservation := e.lastEventObservationFromCache(key)

	// If we found a result, prepare a patch
	if lastObservation.count > 0 {
		// update the event based on the last observation so patch will work as desired
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
	}

	// record our new observation
	e.cache.Add(
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

// Filter controls that a given source+object are not exceeding the allowed rate.
// 对相同的source和involvedObject的event进行限速
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
	var record spamRecord

	// controls our cached information about this event (source+object)
	eventKey := getSpamKey(event)

	// do we have a record of similar events in our cache?
	f.Lock()
	defer f.Unlock()
	value, found := f.cache.Get(eventKey)
	if found {
		record = value.(spamRecord)
	}

	// verify we have a rate limiter for this record
	if record.rateLimiter == nil {
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
	}

	// ensure we have available rate
	filter := !record.rateLimiter.TryAccept()

	// update the cache
	f.cache.Add(eventKey, record)

	return filter
}

// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
//   the full key for normal events, and to the result of
//   EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	// eventKey is the full cache key for this event
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
	// 默认的keyFunc是EventAggregatorByReasonFunc
	// EventAggregatorByReasonFunc返回的aggregateKey是拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason,localKey是Message。
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
	// find a similar record, its lastTimestamp will be the zero value, so we
	// create a new one in that case.
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	// 现在已经过期时间,清空记录
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
	// 写入localKey,这里利用map特性,如果已经存在,不会保存重复的key
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	// localKeys长度(不同的message的个数)小于maxEvents,包括过期或没有过期,则直接返回newEvent或eventKey
	if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	// 在record没有过期且localKeys长度(不同的message的个数)大于等于maxEvents,则进行聚合--修改最后收到的event
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		// 默认的messageFunc为EventAggregatorByReasonMessageFunc
		// EventAggregatorByReasonMessageFunc返回"(combined from similar events): "加上message
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}
  1. An event is generated by the EventRecorder, and this event is sent to the incoming channel of the EventBroadcaster within the Broadcaster.
  2. Both StartLogging and StartRecordingToSink call StartEventWatcher.
  3. StartEventWatcher sends an event to add a new watcher. When the Broadcaster receives this message, it adds a new watcher and starts a goroutine to consume this event.
  4. If initiated by StartLogging, the watcher records the event in the logs.
  5. If initiated by StartRecordingToSink, the watcher calls eventCorrelator to aggregate the events for logging. The eventCorrelator consists of three components: aggregator, logger, and spamfilter.
  6. The aggregator is responsible for aggregating similar events occurring within a certain time frame if they exceed a certain count. It modifies the event if aggregation occurs. Then, it passes the event and the aggregatorKey to the logger for further processing.
  7. The logger checks if a similar event is present in the cache. If found, it updates the count and the last timestamp of the event.
  8. The spamfilter checks if the event has reached the rate limiter’s limit. If the limit is reached, the event is ignored or discarded. If not, a request is sent to the API server.
  9. If the event’s count is greater than 1, a patch request is made to update the event. Otherwise, a create request is sent to the API server.

This process ensures that events are aggregated and logged efficiently while avoiding excessive event generation and API server load.

kubernetes event process

In recent developments, there is a pull request Migrate kubelet to use v1 Event API with the goal of migrating the event API used in kubelet to events.k8s.io/v1. This change is expected to be merged in version 1.22 (Unfortunately, 1.28 has not yet been merged), and it is tracked in the related issue here.

Why Introduce This API?

The introduction of this API aims to address the inefficiency in the existing mechanism for event deduplication (repeated events could still be observed). Additionally, it resolves issues related to high load on the API server. The current API also makes it less convenient to find specific information, which can be challenging for application developers, especially when trying to track events throughout the entire lifecycle of a pod.

Events in events.k8s.io/v1 are compatible with those in corev1. This means that you can create events using corev1, and you can retrieve events through events.k8s.io/v1. However, new fields introduced in events.k8s.io/v1, such as eventTime, will not be set when using corev1.

Correspondence Between events.k8s.io/v1 and corev1 Events

events.k8s.io/v1 corev1 event
Note string message string
Regarding corev1.ObjectReference involvedObject corev1.ObjectReference
DeprecatedSource corev1.EventSource Source EventSource
DeprecatedFirstTimestamp metav1.Time FirstTimestamp metav1.Time
DeprecatedLastTimestamp metav1.Time LastTimestamp metav1.Time
DeprecatedCount int32 Count int32

In version 1.21, a new aggregation algorithm was introduced for events. Here’s how it works:

  1. A series field is added to the API to represent the number of times similar events have occurred and the timestamp of the last occurrence.
  2. There is a map cache that stores similar events. The key for caching is a 5-tuple consisting of action, reason, reportingController, regarding, and related. If these five attributes are the same, the events are considered similar. The value in the cache represents the latest event in the series.
  3. Every 6 minutes, the cache is traversed. If an item in the cache has a series and the last occurrence was more than 6 minutes ago, a request is sent to the API server, and the item is removed from the cache. If an event doesn’t have a series and its eventTime is more than 6 minutes ago, it is removed from the cache.
  4. Every 30 minutes, the cache is traversed again. If an event has a series, a request is sent to the API server to prevent the event from being deleted from etcd. By default, events are retained for 1 hour.
  5. A goroutine is started to add watchers to the broadcaster and handle events received by the watcher. If an event is not in the cache, it is the first event of its kind. It is saved in the cache (without a series), a request is sent to the API server, and the cache is updated with the response. If the event is already in the cache and does not have a series, it is the second event of its kind. The cache entry is updated to include a series (count=1, last occurrence time=current), a request is sent to the API server, and the cache is updated with the response. If the event is already in the cache and has a series, the cache entry’s series is updated (count incremented by 1, last occurrence time=current).
  6. The method used to submit to the API server (patch or create) depends on whether the event to be submitted includes a series. If it includes a series, it means the event already exists, and a patch request is used. Otherwise, a create request is used.

For a specific example, you can refer to the changes in Migrate kubelet to use v1 Event API.

1
2
3
4
5
6
7
8
9
    stopCh := make(chan struct{})
	eventBroadcaster := events.NewEventBroadcasterAdapter(kubeDeps.KubeClient)
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(componentKubelet)
	if kubeDeps.EventClient != nil {
		klog.V(4).InfoS("Sending events to api server")
		eventBroadcaster.StartRecordingToSink(stopCh)
	} else {
		klog.InfoS("No api server defined - no events will be sent to API server")
	}

The NewEventBroadcasterAdapter returns an eventBroadcasterAdapterImpl that contains both the corev1 event broadcaster and the event.v1 event broadcaster.

If the API supports the new events.k8s.io/v1, the code will use the new broadcaster to execute StartRecordingToSink. Otherwise, it will use the corev1 event broadcaster’s StartRecordingToSink. This allows for flexibility in handling events based on the API version supported by the environment.

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
// 包含了新老版本的EventBroadcaster
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
	eventClient := &eventBroadcasterAdapterImpl{}
    // 判断api是否支持新的event
	if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
        // 新的方式
		eventClient.eventsv1Client = client.EventsV1()
		eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})
	}
	// Even though there can soon exist cases when coreBroadcaster won't really be needed,
	// we create it unconditionally because its overhead is minor and will simplify using usage
	// patterns of this library in all components.
    // 老的方式
	eventClient.coreClient = client.CoreV1()
	eventClient.coreBroadcaster = record.NewBroadcaster()
	return eventClient
}

// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
	return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}

// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		eventCache:    eventCache,
		sleepDuration: sleepDuration,
		sink:          sink,
	}
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
    // 每30分钟执行更新已经cache中的event有series
	go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
    // 每6分钟执行更新event--在cache中有6分钟之前发现的或event生成已经过了6分钟
	go wait.Until(e.finishSeries, finishTime, stopCh)
    // 启动一个goroutine添加一个watcher,处理接收到的event
	e.startRecordingEvents(stopCh)
}

// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		if event.Series != nil {
			if recordedEvent, retry := recordEvent(e.sink, event); !retry {
				if recordedEvent != nil {
					e.eventCache[isomorphicKey] = recordedEvent
				}
			}
		}
	}
}

// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		eventSerie := event.Series
		if eventSerie != nil {
			if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
				if _, retry := recordEvent(e.sink, event); !retry {
					delete(e.eventCache, isomorphicKey)
				}
			}
		} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
			delete(e.eventCache, isomorphicKey)
		}
	}
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for {
			watchEvent, ok := <-watcher.ResultChan()
			if !ok {
				return
			}
			eventHandler(watchEvent.Object)
		}
	}()
	return watcher.Stop
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
	eventHandler := func(obj runtime.Object) {
		event, ok := obj.(*eventsv1.Event)
		if !ok {
			klog.Errorf("unexpected type, expected eventsv1.Event")
			return
		}
		e.recordToSink(event, clock.RealClock{})
	}
	stopWatcher := e.StartEventWatcher(eventHandler)
	go func() {
		<-stopCh
		stopWatcher()
	}()
}

func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
	// Make a copy before modification, because there could be multiple listeners.
	eventCopy := event.DeepCopy()
	go func() {
		evToRecord := func() *eventsv1.Event {
			e.mu.Lock()
			defer e.mu.Unlock()
			eventKey := getKey(eventCopy)
			isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
			if isIsomorphic {
				if isomorphicEvent.Series != nil {
					isomorphicEvent.Series.Count++
					isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
					return nil
				}
				isomorphicEvent.Series = &eventsv1.EventSeries{
					Count:            1,
					LastObservedTime: metav1.MicroTime{Time: clock.Now()},
				}
				return isomorphicEvent
			}
			e.eventCache[eventKey] = eventCopy
			return eventCopy
		}()
		if evToRecord != nil {
			recordedEvent := e.attemptRecording(evToRecord)
			if recordedEvent != nil {
				recordedEventKey := getKey(recordedEvent)
				e.mu.Lock()
				defer e.mu.Unlock()
				e.eventCache[recordedEventKey] = recordedEvent
			}
		}
	}()
}

If the API server supports events.k8s.io/v1, then the new EventRecorder is used. Otherwise, the corev1 EventRecorder is used.

The new recorder supports only one method for generating events:

1
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})

The key difference from the old method is that it requires providing an additional runtime.Object to set as Related.

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
	if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
		return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
	}
	return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
	hostname, _ := os.Hostname()
	reportingInstance := reportingController + "-" + hostname
	return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}

staging\src\k8s.io\client-go\tools\events\event_recorder.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	timestamp := metav1.MicroTime{time.Now()}
	message := fmt.Sprintf(note, args...)
	refRegarding, err := reference.GetReference(recorder.scheme, regarding)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
		return
	}
	refRelated, err := reference.GetReference(recorder.scheme, related)
	if err != nil {
		klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
	}
	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}
	event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
	go func() {
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := refRegarding.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &eventsv1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
			Namespace: namespace,
		},
		EventTime:           timestamp,
		Series:              nil,
		ReportingController: reportingController,
		ReportingInstance:   reportingInstance,
		Action:              action,
		Reason:              reason,
		Regarding:           *refRegarding,
		Related:             refRelated,
		Note:                message,
		Type:                eventtype,
	}
}

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
	return &EventRecorderAdapter{
		recorder: recorder,
	}
}

// Eventf is a wrapper around v1 Eventf
func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}

The new API design and changes were made to address certain limitations and issues with the old corev1 event aggregation. The details and rationale behind these changes are documented in the Kubernetes design proposal titled “Events Redesign,” which can be found at https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/events-redesign.md.

Similarly, the design document for the old corev1 event aggregation, which outlines its limitations, can be found at https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/event_compression.md.

These documents provide a comprehensive explanation of the motivations, goals, and details behind the redesign of Kubernetes events and the introduction of the new API.

Related Content