当要排查集群中某个问题时候,一般通过两种方式来排查问题。一种是查看各个组件日志,确定问题;另一个是通过apiserver获取event事件,根据event事件分析出,某个组件做了什么动作。

event事件机制能够方便我们排查问题,event它是一种api 对象,记录了某个组件在某个时间做了某个动作。各个组件作为客户端,向apiserver发起请求创建或更新event事件,默认apiserver只保存一个小时的event事件。

下面以kubelet 1.18.6版本发起event事件为例,分析event生成流程。

本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库

corev1 event类型定义

InvolvedObject:表示这个event是对于那个对象的操作

Reason:表示发起这个动作的原因

Message:这个动作的更详细的描述

Source:表示这个事件的创建者,包含Host(创建者主机名)和Component(创建者的组件名字)

FirstTimestamp:代表第一次发生的时间

LastTimestamp:最后发生的时间

Count:总的发生次数

Type:事件类型,只有两种Warning和Normal

EventTime:第一次发生的事件–为了兼容新的events.k8s.io api添加的

Series:记录相关的事件发生次数和最后发生时间–为了兼容新的events.k8s.io api添加的

Action:采取的动作–为了兼容新的events.k8s.io api添加的

Related:这个事件相关的其他对象–为了兼容新的events.k8s.io api添加的

ReportingController:这个事件的创建者名称–为了兼容新的events.k8s.io api添加的

ReportingInstance:这个事件的创建者实例–为了兼容新的events.k8s.io api添加的

staging\src\k8s.io\api\core\v1\types.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Event is a report of an event somewhere in the cluster.
type Event struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

	// The object that this event is about.
	InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

	// This should be a short, machine understandable string that gives the reason
	// for the transition into the object's current status.
	// TODO: provide exact specification for format.
	// +optional
	Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

	// A human-readable description of the status of this operation.
	// TODO: decide on maximum length.
	// +optional
	Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

	// The component reporting this event. Should be a short machine understandable string.
	// +optional
	Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

	// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
	// +optional
	FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

	// The time at which the most recent occurrence of this event was recorded.
	// +optional
	LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

	// The number of times this event has occurred.
	// +optional
	Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

	// Type of this event (Normal, Warning), new types could be added in the future
	// +optional
	Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

	// Time when this Event was first observed.
	// +optional
	EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`

	// Data about the Event series this event represents or nil if it's a singleton Event.
	// +optional
	Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

	// What action was taken/failed regarding to the Regarding object.
	// +optional
	Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

	// Optional secondary object for more complex actions.
	// +optional
	Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

	// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
	// +optional
	ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

	// ID of the controller instance, e.g. `kubelet-xyzf`.
	// +optional
	ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}

kubelet中的event 初始化

在kubelet中使用makeEventRecorder来进行event的客户端初始化,为后面发起event做准备。

这里面包含了创建EventBroadcaster、EventRecorder和启动StartLogging和StartRecordingToSink。

EventRecorder是event消息的生产者,它将消息发送给EventBroadcaster中的Broadcaster。

EventBroadcaster是系统的核心,它里面的Broadcaster将收到的event消息发送到各个watcher中。

StartLogging负责创建watcher并将收到event消息记录到日志中。

StartRecordingToSink负责创建watcher并将收到event消息进行聚合后提交给apiserver。

cmd\kubelet\app\server.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	if err != nil {
		return err
	}
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	// 在cloud provider是aws会被替换成aws的主机域名
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	if err != nil {
		return err
	}
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
	eventBroadcaster := record.NewBroadcaster()
	// 这里的legacyscheme.Scheme,本文件里 import k8s.io/kubernetes/pkg/kubelet/config--(pkg\kubelet\config\common.go里import k8s.io/kubernetes/pkg/apis/core/install--注册所有core group下的api到legacyscheme.Scheme)
	// legacyscheme.Scheme是用来查找event的InvolvedObject的metadata信息
	// Recorder用于生产event
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	// 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
        // 发送event到apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

EventBroadcaster

Broadcaster负责将收到的event发送给各个watcher,它会启动一个goroutine,将incoming里的消息发送给各个watcher的result通道。支持两种处理方式,当watcher的result通道堵塞时,一种是直接放弃继续发送给下一个watcher、另一种是等待这个watcher接收,但是会阻塞下一个watcher接收这个消息。

新建EventBroadcaster

staging\src\k8s.io\client-go\tools\record\event.go

1
2
3
4
5
6
7
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: defaultSleepDuration,
	}
}

broadcaster

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, incomingQueueLength),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)
	go m.loop()
	return m
}

// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
	m.lock.Lock()
	defer m.lock.Unlock()
	for _, w := range m.watchers {
		// 关闭watcher 消息接收通道
		close(w.result)
	}
	// Delete everything from the map, since presence/absence in the map is used
	// by stopWatching to avoid double-closing the channel.
	m.watchers = map[int64]*broadcasterWatcher{}
}

// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	// 先处理Type为internal-do-function的event,执行event.Object的func
	// 避免这个Event被watcher消费,导致重复执行
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)
	}
	// 消息处理完,关闭watcher的接收通道,重置清空watchers列表
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	m.lock.Lock()
	defer m.lock.Unlock()
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,则执行default
			// 即w.result卡住或w.stopped也卡住--这个watcher的消息堵住了,那么就放弃发送给这个watcher
			case w.result <- event:
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,那就一直等,直到某个chan不堵塞
			// 即w.result卡住或w.stopped也卡住,这个watcher不会被放弃,一直等到它空闲
			// 但是它会阻塞下一个watcher接收这个消息
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}

EventRecorder

eventRecorder用于生产event消息,这里使用了k8s.io/kubernetes/pkg/api/legacyscheme里的Scheme,确定event的InvolvedObject的groupversion信息–当object的groupversionkind为空的时候从scheme里查找已经注册的groupversionkind。

1
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})

其中生产可以使用以下几种:

支持自定义消息格式Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

直接传递原生消息Event(object runtime.Object, eventtype, reason, message string)

添加annotation且自定义消息格式AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
	return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

在日志中记录event–StartLogging

创建一个watcher,并启动一个gorutine接收event,并记录在日志中。

1
2
   // 启动个goroutine来记录事件日志
	eventBroadcaster.StartLogging(klog.V(3).Infof)

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	// 新建一个watcher,读取事件进行记录
	return e.StartEventWatcher(
		func(e *v1.Event) {
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	// 创建并添加新的watcher
	watcher := e.Watch()
	// 启动goroutine 处理这个watcher的event
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

创建watcher过程

先创建一个添加watcher的event–Type为“internalRunFunctionMarker”,等待这个event消息被消费–上面的loop()会消费这个event执行里面的object,即执行了生成新的watcher,并添加到Broadcaster里的watchers列表。这个watcher能够消费添加之后收到的event,并不能消费之前的收到的event。

staging\src\k8s.io\apimachinery\pkg\watch\mux.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Broadcaster) Watch() Interface {
	var w *broadcasterWatcher
	m.blockQueue(func() {
		m.lock.Lock()
		defer m.lock.Unlock()
		id := m.nextWatcher
		m.nextWatcher++
		w = &broadcasterWatcher{
			result:  make(chan Event, m.watchQueueLength),
			stopped: make(chan struct{}),
			id:      id,
			m:       m,
		}
		m.watchers[id] = w
	})
	return w
}

// Execute f, blocking the incoming queue (and waiting for it to drain first).
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func (b *Broadcaster) blockQueue(f func()) {
	var wg sync.WaitGroup
	wg.Add(1)
	// 等待Event发送到incoming
	// 这个Event不会被watcher消费,在loop()中会单独处理这个Type的Event--执行functionFakeRuntimeObject里的func
	b.incoming <- Event{
		Type: internalRunFunctionMarker,
		Object: functionFakeRuntimeObject(func() {
			defer wg.Done()
			f()
		}),
	}
	// 等待f()执行完成
	wg.Wait()
}

发送event到apiserver–StartRecordingToSink

为了避免重复event事件频繁的滥发,导致etcd压力大。所以在发送到apiserver之前,在客户端进行聚合,将某个时间段内相同、相似的event进行聚合。

1
2
3
4
5
6
if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}

eventCorrelator用来处理收到的event,将event依次发给EventAggregator、EventLogger、spamFilter.Filter进行修改或丢弃。

eventCorrelator包含三个组件EventAggregator、EventLogger、spamFilter.Filter。

EventAggregator

每个event会以aggregateKey(拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason)为key,value为localKeys(包含所有唯一的message集合)和lastTimestamp的struct,保存在lru的cache中。即相同的aggregateKey的event会保存在一个bucket中,event是否唯一根据localKeys中是否有相同的message。

如果localKeys中的message的个数大于等于10个且lastTimestamp在最近10分钟之内,那么这个event的message前面会添加(combined from similar events): ;否则这个event原生的传给EventLogger进行处理。

如果event信息被修改过,则EventLogger的lru cache的key为aggregateKey,否则key为getEventKey返回值–拼合event的Source、involvedObject(包含FieldPath)、Type、reason、message字段。

EventAggregator不会减少event数量,只是当类似的event超过9个时候,会修改event的message。

EventLogger:

LRU cache:key为aggregateKey 或getEventKey返回值; value为已经发现event的count、firstTimestamp、name、resourceVersion

查找event在LRU缓存中发现过几次,如果是已经发现的则将event的count加一并更新lasttimestamp且生成更新event的patch;否则在cache中添加新的item。

EventLogger不会减少event数量,只是当cache中存在会修改event的count和lasttimestamp。

Filter

根据key(source和involvedObject)查找LRU缓存中是否有限速器,如果没有则新建一个,然后判断是否达到限速器的限制,达到了限制event被忽略丢弃,否则执行recordEvent。

recordEvent

执行将上面处理的event发送到apiserver。如果count大于1,则进行patch请求(如果返回404则执行create请求),否则执行create请求。

如果请求apiserver失败,会进行重试,最多重试12次,第一次重试等待随机(1到10秒随机值),默认等待时间为10秒。

staging\src\k8s.io\client-go\tools\record\events_cache.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy

	e.Lock()
	defer e.Unlock()

	// Check if there is an existing event we should update
	lastObservation := e.lastEventObservationFromCache(key)

	// If we found a result, prepare a patch
	if lastObservation.count > 0 {
		// update the event based on the last observation so patch will work as desired
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
	}

	// record our new observation
	e.cache.Add(
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

// Filter controls that a given source+object are not exceeding the allowed rate.
// 对相同的source和involvedObject的event进行限速
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
	var record spamRecord

	// controls our cached information about this event (source+object)
	eventKey := getSpamKey(event)

	// do we have a record of similar events in our cache?
	f.Lock()
	defer f.Unlock()
	value, found := f.cache.Get(eventKey)
	if found {
		record = value.(spamRecord)
	}

	// verify we have a rate limiter for this record
	if record.rateLimiter == nil {
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
	}

	// ensure we have available rate
	filter := !record.rateLimiter.TryAccept()

	// update the cache
	f.cache.Add(eventKey, record)

	return filter
}

// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
//   the full key for normal events, and to the result of
//   EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	// eventKey is the full cache key for this event
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
	// 默认的keyFunc是EventAggregatorByReasonFunc
	// EventAggregatorByReasonFunc返回的aggregateKey是拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason,localKey是Message。
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
	// find a similar record, its lastTimestamp will be the zero value, so we
	// create a new one in that case.
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	// 现在已经过期时间,清空记录
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
	// 写入localKey,这里利用map特性,如果已经存在,不会保存重复的key
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	// localKeys长度(不同的message的个数)小于maxEvents,包括过期或没有过期,则直接返回newEvent或eventKey
	if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	// 在record没有过期且localKeys长度(不同的message的个数)大于等于maxEvents,则进行聚合--修改最后收到的event
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		// 默认的messageFunc为EventAggregatorByReasonMessageFunc
		// EventAggregatorByReasonMessageFunc返回"(combined from similar events): "加上message
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

整个流程

其中incoming和result是chan通道,incoming属于Broadcaster,result属于watcher。

  1. EventRecorder生产一个event,并把event发送到EventBroadcaster中的Broadcaster里的incoming chan通道。
  2. StartLogging和StartRecordingToSink会调用StartEventWatcher。
  3. StartEventWatcher会发送一个增加watcher的event,Broadcaster收到这个消息后,会增加一个watcher,启动一个goroutine消费这个event。
  4. 如果是StartLogging发起,那么watcher会将event记录到日志中。
  5. 如果是StartRecordingToSink发起,那么watcher会调用eventCorrelator进行聚合日志,eventCorrelator内部包含aggregator、logger、spamfilter组件。
  6. aggregator负责聚合一个时间段内超过一定数量相似的event并进行修改,如果没有超过则不会修改event,然后将event和aggratorKey传给logger处理
  7. logger会查找缓存中是否有类似的event,如果有则会修改event的count和最后发现事件
  8. spamfiter会查看是否达到了限速器限制,如果达到了限制,就忽略或丢弃这个event消息;未达到限制则发送请求到apiserver。
  9. event中count大于1,则进行patch请求更新event,否则进行create请求。

最新进展

在最近有个pr Migrate kubelet to use v1 Event API,目标是迁移kubelt中使用的event的api到events.k8s.io/v1,应该会在1.22版本被合并,相关issuehttps://github.com/kubernetes/enhancements/issues/383。

为什么要引入这个api?

为了解决现有机制的去重效率不高(重复的event还是能看到–如果10分钟内类似的event没有达到10个且未达到限速器限制)导致apiserver压力大,使用现有api不方便查找需要的信息–(对应用开发者不友好,比如要查找pod全生命周期过程的event)。

events.k8s.io/v1和corev1相互兼容的,即使用corev1创建event,也能通过events.k8s.io/v1获取event,只是events.k8s.io/v1新加字段不会被设置(比如eventTime)。

在events.k8s.io/v1和corev1 event变化对应关系

events.k8s.io/v1 corev1 event
Note string message string
Regarding corev1.ObjectReference involvedObject corev1.ObjectReference
DeprecatedSource corev1.EventSource Source EventSource
DeprecatedFirstTimestamp metav1.Time FirstTimestamp metav1.Time
DeprecatedLastTimestamp metav1.Time LastTimestamp metav1.Time
DeprecatedCount int32 Count int32

在1.21版本里的新的聚合算法

  1. 在api中添加一个series字段用于表示相似的event出现的次数和最后出现时间
  2. 有一个map缓存保存相似的event,key为action、reason、reportingController、regarding、related,即这5元组一样就认为是相似的event,value为这个serias的最新event。
  3. 每6分钟遍历cache,如果item里的event有series,且在6分钟之前出现,那么就向apiserver发起请求,从cache中删除;如果event没有series且eventtime在6分钟之前,那么从cache中删除。
  4. 每30分钟遍历cache,event有series,则向apiserver发起请求,主要为了防止这个event被etcd删除–默认event保留1个小时。
  5. 启动一个goroutine,在broadcaster添加watcher并处理watcher收到event,如果它在cache中没有,说明是第一个event,将它保存到cache中(这个event没有series),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且没有series,说明是第二个event,则缓存中的event添加series(出现次数为1,最后出现时间为当前),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且有series,则更新cache中event的series(出现次数加1,最后出现时间为当前)
  6. 提交apiserver的方法是patch还是create,取决于要提交的event是否包含series,包含series说明已经存在则使用patch,否则是用create。

Migrate kubelet to use v1 Event API为例

1
2
3
4
5
6
7
8
9
    stopCh := make(chan struct{})
	eventBroadcaster := events.NewEventBroadcasterAdapter(kubeDeps.KubeClient)
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(componentKubelet)
	if kubeDeps.EventClient != nil {
		klog.V(4).InfoS("Sending events to api server")
		eventBroadcaster.StartRecordingToSink(stopCh)
	} else {
		klog.InfoS("No api server defined - no events will be sent to API server")
	}

NewEventBroadcasterAdapter返回的eventBroadcasterAdapterImpl包含corev1 event的broadcaster和event v1的broadcaster

如果api支持新的events.k8s.io/v1,只是使用新的broadcaster执行StartRecordingToSink,否则使用corev1 event的StartRecordingToSink。

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
// 包含了新老版本的EventBroadcaster
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
	eventClient := &eventBroadcasterAdapterImpl{}
    // 判断api是否支持新的event
	if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
        // 新的方式
		eventClient.eventsv1Client = client.EventsV1()
		eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})
	}
	// Even though there can soon exist cases when coreBroadcaster won't really be needed,
	// we create it unconditionally because its overhead is minor and will simplify using usage
	// patterns of this library in all components.
    // 老的方式
	eventClient.coreClient = client.CoreV1()
	eventClient.coreBroadcaster = record.NewBroadcaster()
	return eventClient
}

// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
	return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}

// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		eventCache:    eventCache,
		sleepDuration: sleepDuration,
		sink:          sink,
	}
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
    // 每30分钟执行更新已经cache中的event有series
	go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
    // 每6分钟执行更新event--在cache中有6分钟之前发现的或event生成已经过了6分钟
	go wait.Until(e.finishSeries, finishTime, stopCh)
    // 启动一个goroutine添加一个watcher,处理接收到的event
	e.startRecordingEvents(stopCh)
}

// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		if event.Series != nil {
			if recordedEvent, retry := recordEvent(e.sink, event); !retry {
				if recordedEvent != nil {
					e.eventCache[isomorphicKey] = recordedEvent
				}
			}
		}
	}
}

// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
	// TODO: Investigate whether lock contention won't be a problem
	e.mu.Lock()
	defer e.mu.Unlock()
	for isomorphicKey, event := range e.eventCache {
		eventSerie := event.Series
		if eventSerie != nil {
			if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
				if _, retry := recordEvent(e.sink, event); !retry {
					delete(e.eventCache, isomorphicKey)
				}
			}
		} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
			delete(e.eventCache, isomorphicKey)
		}
	}
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for {
			watchEvent, ok := <-watcher.ResultChan()
			if !ok {
				return
			}
			eventHandler(watchEvent.Object)
		}
	}()
	return watcher.Stop
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
	eventHandler := func(obj runtime.Object) {
		event, ok := obj.(*eventsv1.Event)
		if !ok {
			klog.Errorf("unexpected type, expected eventsv1.Event")
			return
		}
		e.recordToSink(event, clock.RealClock{})
	}
	stopWatcher := e.StartEventWatcher(eventHandler)
	go func() {
		<-stopCh
		stopWatcher()
	}()
}

func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
	// Make a copy before modification, because there could be multiple listeners.
	eventCopy := event.DeepCopy()
	go func() {
		evToRecord := func() *eventsv1.Event {
			e.mu.Lock()
			defer e.mu.Unlock()
			eventKey := getKey(eventCopy)
			isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
			if isIsomorphic {
				if isomorphicEvent.Series != nil {
					isomorphicEvent.Series.Count++
					isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
					return nil
				}
				isomorphicEvent.Series = &eventsv1.EventSeries{
					Count:            1,
					LastObservedTime: metav1.MicroTime{Time: clock.Now()},
				}
				return isomorphicEvent
			}
			e.eventCache[eventKey] = eventCopy
			return eventCopy
		}()
		if evToRecord != nil {
			recordedEvent := e.attemptRecording(evToRecord)
			if recordedEvent != nil {
				recordedEventKey := getKey(recordedEvent)
				e.mu.Lock()
				defer e.mu.Unlock()
				e.eventCache[recordedEventKey] = recordedEvent
			}
		}
	}()
}

生产者recorder

如果apiserver支持events.k8s.io/v1,则使用新的EventRecorder,否则使用corev1的EventRecorder。

新的recoder

只支持一种方式生成event–Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})

与老的方式不同的是要多提供一个runtime.Object,设置为Related。

staging\src\k8s.io\client-go\tools\events\event_broadcaster.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
	if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
		return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
	}
	return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
	hostname, _ := os.Hostname()
	reportingInstance := reportingController + "-" + hostname
	return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}

staging\src\k8s.io\client-go\tools\events\event_recorder.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	timestamp := metav1.MicroTime{time.Now()}
	message := fmt.Sprintf(note, args...)
	refRegarding, err := reference.GetReference(recorder.scheme, regarding)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
		return
	}
	refRelated, err := reference.GetReference(recorder.scheme, related)
	if err != nil {
		klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
	}
	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}
	event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
	go func() {
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := refRegarding.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &eventsv1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
			Namespace: namespace,
		},
		EventTime:           timestamp,
		Series:              nil,
		ReportingController: reportingController,
		ReportingInstance:   reportingInstance,
		Action:              action,
		Reason:              reason,
		Regarding:           *refRegarding,
		Related:             refRelated,
		Note:                message,
		Type:                eventtype,
	}
}

老的recorder接口

staging\src\k8s.io\client-go\tools\record\event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
	return &EventRecorderAdapter{
		recorder: recorder,
	}
}

// Eventf is a wrapper around v1 Eventf
func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
	a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}

新的api为什么这么设计,请阅读相关的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/events-redesign.md

老的corev1 event聚合的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/event_compression.md