kubernetes里的event事件生成机制
当要排查集群中某个问题时候,一般通过两种方式来排查问题。一种是查看各个组件日志,确定问题;另一个是通过apiserver获取event事件,根据event事件分析出,某个组件做了什么动作。
event事件机制能够方便我们排查问题,event它是一种api 对象,记录了某个组件在某个时间做了某个动作。各个组件作为客户端,向apiserver发起请求创建或更新event事件,默认apiserver只保存一个小时的event事件。
下面以kubelet 1.18.6版本发起event事件为例,分析event生成流程。
本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库。
1 corev1 event类型定义
InvolvedObject:表示这个event是对于那个对象的操作
Reason:表示发起这个动作的原因
Message:这个动作的更详细的描述
Source:表示这个事件的创建者,包含Host(创建者主机名)和Component(创建者的组件名字)
FirstTimestamp:代表第一次发生的时间
LastTimestamp:最后发生的时间
Count:总的发生次数
Type:事件类型,只有两种Warning和Normal
EventTime:第一次发生的事件–为了兼容新的events.k8s.io api添加的
Series:记录相关的事件发生次数和最后发生时间–为了兼容新的events.k8s.io api添加的
Action:采取的动作–为了兼容新的events.k8s.io api添加的
Related:这个事件相关的其他对象–为了兼容新的events.k8s.io api添加的
ReportingController:这个事件的创建者名称–为了兼容新的events.k8s.io api添加的
ReportingInstance:这个事件的创建者实例–为了兼容新的events.k8s.io api添加的
staging\src\k8s.io\api\core\v1\types.go
// Event is a report of an event somewhere in the cluster.
type Event struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// The object that this event is about.
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// The number of times this event has occurred.
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
2 kubelet中的event 初始化
在kubelet中使用makeEventRecorder来进行event的客户端初始化,为后面发起event做准备。
这里面包含了创建EventBroadcaster、EventRecorder和启动StartLogging和StartRecordingToSink。
EventRecorder是event消息的生产者,它将消息发送给EventBroadcaster中的Broadcaster。
EventBroadcaster是系统的核心,它里面的Broadcaster将收到的event消息发送到各个watcher中。
StartLogging负责创建watcher并将收到event消息记录到日志中。
StartRecordingToSink负责创建watcher并将收到event消息进行聚合后提交给apiserver。
cmd\kubelet\app\server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
// 在cloud provider是aws会被替换成aws的主机域名
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
return
}
eventBroadcaster := record.NewBroadcaster()
// 这里的legacyscheme.Scheme,本文件里 import k8s.io/kubernetes/pkg/kubelet/config--(pkg\kubelet\config\common.go里import k8s.io/kubernetes/pkg/apis/core/install--注册所有core group下的api到legacyscheme.Scheme)
// legacyscheme.Scheme是用来查找event的InvolvedObject的metadata信息
// Recorder用于生产event
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
// 启动个goroutine来记录事件日志
eventBroadcaster.StartLogging(klog.V(3).Infof)
if kubeDeps.EventClient != nil {
klog.V(4).Infof("Sending events to api server.")
// 发送event到apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.Warning("No api server defined - no events will be sent to API server.")
}
}
3 EventBroadcaster
Broadcaster负责将收到的event发送给各个watcher,它会启动一个goroutine,将incoming里的消息发送给各个watcher的result通道。支持两种处理方式,当watcher的result通道堵塞时,一种是直接放弃继续发送给下一个watcher、另一种是等待这个watcher接收,但是会阻塞下一个watcher接收这个消息。
新建EventBroadcaster
staging\src\k8s.io\client-go\tools\record\event.go
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}
broadcaster
staging\src\k8s.io\apimachinery\pkg\watch\mux.go
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}
// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
// 关闭watcher 消息接收通道
close(w.result)
}
// Delete everything from the map, since presence/absence in the map is used
// by stopWatching to avoid double-closing the channel.
m.watchers = map[int64]*broadcasterWatcher{}
}
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
// 先处理Type为internal-do-function的event,执行event.Object的func
// 避免这个Event被watcher消费,导致重复执行
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
// 消息处理完,关闭watcher的接收通道,重置清空watchers列表
m.closeAll()
m.distributing.Done()
}
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,则执行default
// 即w.result卡住或w.stopped也卡住--这个watcher的消息堵住了,那么就放弃发送给这个watcher
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,那就一直等,直到某个chan不堵塞
// 即w.result卡住或w.stopped也卡住,这个watcher不会被放弃,一直等到它空闲
// 但是它会阻塞下一个watcher接收这个消息
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
4 EventRecorder
eventRecorder用于生产event消息,这里使用了k8s.io/kubernetes/pkg/api/legacyscheme
里的Scheme,确定event的InvolvedObject的groupversion信息–当object的groupversionkind为空的时候从scheme里查找已经注册的groupversionkind。
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
其中生产可以使用以下几种:
支持自定义消息格式Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
直接传递原生消息Event(object runtime.Object, eventtype, reason, message string)
添加annotation且自定义消息格式AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
staging\src\k8s.io\client-go\tools\record\event.go
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}
5 在日志中记录event–StartLogging
创建一个watcher,并启动一个gorutine接收event,并记录在日志中。
// 启动个goroutine来记录事件日志
eventBroadcaster.StartLogging(klog.V(3).Infof)
staging\src\k8s.io\client-go\tools\record\event.go
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
// 新建一个watcher,读取事件进行记录
return e.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
// 创建并添加新的watcher
watcher := e.Watch()
// 启动goroutine 处理这个watcher的event
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}
创建watcher过程
先创建一个添加watcher的event–Type为“internalRunFunctionMarker”,等待这个event消息被消费–上面的loop()会消费这个event执行里面的object,即执行了生成新的watcher,并添加到Broadcaster里的watchers列表。这个watcher能够消费添加之后收到的event,并不能消费之前的收到的event。
staging\src\k8s.io\apimachinery\pkg\watch\mux.go
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
})
return w
}
// Execute f, blocking the incoming queue (and waiting for it to drain first).
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func (b *Broadcaster) blockQueue(f func()) {
var wg sync.WaitGroup
wg.Add(1)
// 等待Event发送到incoming
// 这个Event不会被watcher消费,在loop()中会单独处理这个Type的Event--执行functionFakeRuntimeObject里的func
b.incoming <- Event{
Type: internalRunFunctionMarker,
Object: functionFakeRuntimeObject(func() {
defer wg.Done()
f()
}),
}
// 等待f()执行完成
wg.Wait()
}
6 发送event到apiserver–StartRecordingToSink
为了避免重复event事件频繁的滥发,导致etcd压力大。所以在发送到apiserver之前,在客户端进行聚合,将某个时间段内相同、相似的event进行聚合。
if kubeDeps.EventClient != nil {
klog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.Warning("No api server defined - no events will be sent to API server.")
}
eventCorrelator用来处理收到的event,将event依次发给EventAggregator、EventLogger、spamFilter.Filter进行修改或丢弃。
eventCorrelator包含三个组件EventAggregator、EventLogger、spamFilter.Filter。
EventAggregator:
每个event会以aggregateKey(拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason)为key,value为localKeys(包含所有唯一的message集合)和lastTimestamp的struct,保存在lru的cache中。即相同的aggregateKey的event会保存在一个bucket中,event是否唯一根据localKeys中是否有相同的message。
如果localKeys中的message的个数大于等于10个且lastTimestamp在最近10分钟之内,那么这个event的message前面会添加(combined from similar events):
;否则这个event原生的传给EventLogger进行处理。
如果event信息被修改过,则EventLogger的lru cache的key为aggregateKey,否则key为getEventKey返回值–拼合event的Source、involvedObject(包含FieldPath)、Type、reason、message字段。
EventAggregator不会减少event数量,只是当类似的event超过9个时候,会修改event的message。
EventLogger:
LRU cache:key为aggregateKey 或getEventKey返回值; value为已经发现event的count、firstTimestamp、name、resourceVersion
查找event在LRU缓存中发现过几次,如果是已经发现的则将event的count加一并更新lasttimestamp且生成更新event的patch;否则在cache中添加新的item。
EventLogger不会减少event数量,只是当cache中存在会修改event的count和lasttimestamp。
Filter:
根据key(source和involvedObject)查找LRU缓存中是否有限速器,如果没有则新建一个,然后判断是否达到限速器的限制,达到了限制event被忽略丢弃,否则执行recordEvent。
recordEvent:
执行将上面处理的event发送到apiserver。如果count大于1,则进行patch请求(如果返回404则执行create请求),否则执行create请求。
如果请求apiserver失败,会进行重试,最多重试12次,第一次重试等待随机(1到10秒随机值),默认等待时间为10秒。
staging\src\k8s.io\client-go\tools\record\events_cache.go
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
eventCopy := *newEvent
event := &eventCopy
e.Lock()
defer e.Unlock()
// Check if there is an existing event we should update
lastObservation := e.lastEventObservationFromCache(key)
// If we found a result, prepare a patch
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
event.ResourceVersion = lastObservation.resourceVersion
event.FirstTimestamp = lastObservation.firstTimestamp
event.Count = int32(lastObservation.count) + 1
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
}
// record our new observation
e.cache.Add(
key,
eventLog{
count: uint(event.Count),
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
return event, patch, err
}
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// Filter controls that a given source+object are not exceeding the allowed rate.
// 对相同的source和involvedObject的event进行限速
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord
// controls our cached information about this event (source+object)
eventKey := getSpamKey(event)
// do we have a record of similar events in our cache?
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey)
if found {
record = value.(spamRecord)
}
// verify we have a rate limiter for this record
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
}
// ensure we have available rate
filter := !record.rateLimiter.TryAccept()
// update the cache
f.cache.Add(eventKey, record)
return filter
}
// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
// the full key for normal events, and to the result of
// EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord
// eventKey is the full cache key for this event
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
// 默认的keyFunc是EventAggregatorByReasonFunc
// EventAggregatorByReasonFunc返回的aggregateKey是拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason,localKey是Message。
aggregateKey, localKey := e.keyFunc(newEvent)
// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
// 现在已经过期时间,清空记录
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
// Write the new event into the aggregation record and put it on the cache
// 写入localKey,这里利用map特性,如果已经存在,不会保存重复的key
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)
// If we are not yet over the threshold for unique events, don't correlate them
// localKeys长度(不同的message的个数)小于maxEvents,包括过期或没有过期,则直接返回newEvent或eventKey
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}
// do not grow our local key set any larger than max
record.localKeys.PopAny()
// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
// 在record没有过期且localKeys长度(不同的message的个数)大于等于maxEvents,则进行聚合--修改最后收到的event
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
// 默认的messageFunc为EventAggregatorByReasonMessageFunc
// EventAggregatorByReasonMessageFunc返回"(combined from similar events): "加上message
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey
}
7 整个流程
其中incoming和result是chan通道,incoming属于Broadcaster,result属于watcher。
- EventRecorder生产一个event,并把event发送到EventBroadcaster中的Broadcaster里的incoming chan通道。
- StartLogging和StartRecordingToSink会调用StartEventWatcher。
- StartEventWatcher会发送一个增加watcher的event,Broadcaster收到这个消息后,会增加一个watcher,启动一个goroutine消费这个event。
- 如果是StartLogging发起,那么watcher会将event记录到日志中。
- 如果是StartRecordingToSink发起,那么watcher会调用eventCorrelator进行聚合日志,eventCorrelator内部包含aggregator、logger、spamfilter组件。
- aggregator负责聚合一个时间段内超过一定数量相似的event并进行修改,如果没有超过则不会修改event,然后将event和aggratorKey传给logger处理
- logger会查找缓存中是否有类似的event,如果有则会修改event的count和最后发现事件
- spamfiter会查看是否达到了限速器限制,如果达到了限制,就忽略或丢弃这个event消息;未达到限制则发送请求到apiserver。
- event中count大于1,则进行patch请求更新event,否则进行create请求。
8 最新进展
在最近有个pr Migrate kubelet to use v1 Event API,目标是迁移kubelt中使用的event的api到events.k8s.io/v1,相关issue Redesign Event API。
为什么要引入这个api?
为了解决现有机制的去重效率不高(重复的event还是能看到–如果10分钟内类似的event没有达到10个且未达到限速器限制)导致apiserver压力大,使用现有api不方便查找需要的信息–(对应用开发者不友好,比如要查找pod全生命周期过程的event)。
events.k8s.io/v1和corev1相互兼容的,即使用corev1创建event,也能通过events.k8s.io/v1获取event,只是events.k8s.io/v1新加字段不会被设置(比如eventTime)。
在events.k8s.io/v1和corev1 event变化对应关系
events.k8s.io/v1 | corev1 event |
---|---|
Note string | message string |
Regarding corev1.ObjectReference | involvedObject corev1.ObjectReference |
DeprecatedSource corev1.EventSource | Source EventSource |
DeprecatedFirstTimestamp metav1.Time | FirstTimestamp metav1.Time |
DeprecatedLastTimestamp metav1.Time | LastTimestamp metav1.Time |
DeprecatedCount int32 | Count int32 |
8.1 在1.21版本里的新的聚合算法
- 在api中添加一个series字段用于表示相似的event出现的次数和最后出现时间
- 有一个map缓存保存相似的event,key为action、reason、reportingController、regarding、related,即这5元组一样就认为是相似的event,value为这个serias的最新event。
- 每6分钟遍历cache,如果item里的event有series,且在6分钟之前出现,那么就向apiserver发起请求,从cache中删除;如果event没有series且eventtime在6分钟之前,那么从cache中删除。
- 每30分钟遍历cache,event有series,则向apiserver发起请求,主要为了防止这个event被etcd删除–默认event保留1个小时。
- 启动一个goroutine,在broadcaster添加watcher并处理watcher收到event,如果它在cache中没有,说明是第一个event,将它保存到cache中(这个event没有series),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且没有series,说明是第二个event,则缓存中的event添加series(出现次数为1,最后出现时间为当前),向apiserver发起请求,然后更新缓存中的event为apiserver返回的;如果它在cacahe中且有series,则更新cache中event的series(出现次数加1,最后出现时间为当前)
- 提交apiserver的方法是patch还是create,取决于要提交的event是否包含series,包含series说明已经存在则使用patch,否则是用create。
以Migrate kubelet to use v1 Event API为例
stopCh := make(chan struct{})
eventBroadcaster := events.NewEventBroadcasterAdapter(kubeDeps.KubeClient)
kubeDeps.Recorder = eventBroadcaster.NewRecorder(componentKubelet)
if kubeDeps.EventClient != nil {
klog.V(4).InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(stopCh)
} else {
klog.InfoS("No api server defined - no events will be sent to API server")
}
NewEventBroadcasterAdapter返回的eventBroadcasterAdapterImpl包含corev1 event的broadcaster和event v1的broadcaster
如果api支持新的events.k8s.io/v1,只是使用新的broadcaster执行StartRecordingToSink,否则使用corev1 event的StartRecordingToSink。
staging\src\k8s.io\client-go\tools\events\event_broadcaster.go
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
// 包含了新老版本的EventBroadcaster
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
eventClient := &eventBroadcasterAdapterImpl{}
// 判断api是否支持新的event
if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
// 新的方式
eventClient.eventsv1Client = client.EventsV1()
eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})
}
// Even though there can soon exist cases when coreBroadcaster won't really be needed,
// we create it unconditionally because its overhead is minor and will simplify using usage
// patterns of this library in all components.
// 老的方式
eventClient.coreClient = client.CoreV1()
eventClient.coreBroadcaster = record.NewBroadcaster()
return eventClient
}
// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
eventCache: eventCache,
sleepDuration: sleepDuration,
sink: sink,
}
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
// 每30分钟执行更新已经cache中的event有series
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
// 每6分钟执行更新event--在cache中有6分钟之前发现的或event生成已经过了6分钟
go wait.Until(e.finishSeries, finishTime, stopCh)
// 启动一个goroutine添加一个watcher,处理接收到的event
e.startRecordingEvents(stopCh)
}
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
if recordedEvent != nil {
e.eventCache[isomorphicKey] = recordedEvent
}
}
}
}
}
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
delete(e.eventCache, isomorphicKey)
}
}
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
watcher := e.Watch()
go func() {
defer utilruntime.HandleCrash()
for {
watchEvent, ok := <-watcher.ResultChan()
if !ok {
return
}
eventHandler(watchEvent.Object)
}
}()
return watcher.Stop
}
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
}
stopWatcher := e.StartEventWatcher(eventHandler)
go func() {
<-stopCh
stopWatcher()
}()
}
func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
evToRecord := func() *eventsv1.Event {
e.mu.Lock()
defer e.mu.Unlock()
eventKey := getKey(eventCopy)
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
if isIsomorphic {
if isomorphicEvent.Series != nil {
isomorphicEvent.Series.Count++
isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
return nil
}
isomorphicEvent.Series = &eventsv1.EventSeries{
Count: 1,
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
}
return isomorphicEvent
}
e.eventCache[eventKey] = eventCopy
return eventCopy
}()
if evToRecord != nil {
recordedEvent := e.attemptRecording(evToRecord)
if recordedEvent != nil {
recordedEventKey := getKey(recordedEvent)
e.mu.Lock()
defer e.mu.Unlock()
e.eventCache[recordedEventKey] = recordedEvent
}
}
}()
}
8.2 生产者recorder
如果apiserver支持events.k8s.io/v1,则使用新的EventRecorder,否则使用corev1的EventRecorder。
8.2.1 新的recoder
只支持一种方式生成event–Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
与老的方式不同的是要多提供一个runtime.Object,设置为Related。
staging\src\k8s.io\client-go\tools\events\event_broadcaster.go
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
}
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}
staging\src\k8s.io\client-go\tools\events\event_recorder.go
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
return
}
refRelated, err := reference.GetReference(recorder.scheme, related)
if err != nil {
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
go func() {
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: action,
Reason: reason,
Regarding: *refRegarding,
Related: refRelated,
Note: message,
Type: eventtype,
}
}
8.2.2 老的recorder接口
staging\src\k8s.io\client-go\tools\record\event.go
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
return &EventRecorderAdapter{
recorder: recorder,
}
}
// Eventf is a wrapper around v1 Eventf
func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}
新的api为什么这么设计,请阅读相关的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/events-redesign.md
老的corev1 event聚合的设计文档https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/event_compression.md