// Event is a report of an event somewhere in the cluster.
typeEventstruct{metav1.TypeMeta`json:",inline"`// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
metav1.ObjectMeta`json:"metadata" protobuf:"bytes,1,opt,name=metadata"`// The object that this event is about.
InvolvedObjectObjectReference`json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reasonstring`json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Messagestring`json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`// The component reporting this event. Should be a short machine understandable string.
// +optional
SourceEventSource`json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestampmetav1.Time`json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestampmetav1.Time`json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`// The number of times this event has occurred.
// +optional
Countint32`json:"count,omitempty" protobuf:"varint,8,opt,name=count"`// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Typestring`json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`// Time when this Event was first observed.
// +optional
EventTimemetav1.MicroTime`json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series*EventSeries`json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`// What action was taken/failed regarding to the Regarding object.
// +optional
Actionstring`json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`// Optional secondary object for more complex actions.
// +optional
Related*ObjectReference`json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingControllerstring`json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstancestring`json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`}
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
funcmakeEventRecorder(kubeDeps*kubelet.Dependencies,nodeNametypes.NodeName){ifkubeDeps.Recorder!=nil{return}eventBroadcaster:=record.NewBroadcaster()// 这里的legacyscheme.Scheme,本文件里 import k8s.io/kubernetes/pkg/kubelet/config--(pkg\kubelet\config\common.go里import k8s.io/kubernetes/pkg/apis/core/install--注册所有core group下的api到legacyscheme.Scheme)
// legacyscheme.Scheme是用来查找event的InvolvedObject的metadata信息
// Recorder用于生产event
kubeDeps.Recorder=eventBroadcaster.NewRecorder(legacyscheme.Scheme,v1.EventSource{Component:componentKubelet,Host:string(nodeName)})// 启动个goroutine来记录事件日志
eventBroadcaster.StartLogging(klog.V(3).Infof)ifkubeDeps.EventClient!=nil{klog.V(4).Infof("Sending events to api server.")// 发送event到apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface:kubeDeps.EventClient.Events("")})}else{klog.Warning("No api server defined - no events will be sent to API server.")}}
// Creates a new event broadcaster.
funcNewBroadcaster()EventBroadcaster{return&eventBroadcasterImpl{Broadcaster:watch.NewBroadcaster(maxQueuedEvents,watch.DropIfChannelFull),sleepDuration:defaultSleepDuration,}}
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
funcNewBroadcaster(queueLengthint,fullChannelBehaviorFullChannelBehavior)*Broadcaster{m:=&Broadcaster{watchers:map[int64]*broadcasterWatcher{},incoming:make(chanEvent,incomingQueueLength),watchQueueLength:queueLength,fullChannelBehavior:fullChannelBehavior,}m.distributing.Add(1)gom.loop()returnm}// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func(m*Broadcaster)closeAll(){m.lock.Lock()deferm.lock.Unlock()for_,w:=rangem.watchers{// 关闭watcher 消息接收通道
close(w.result)}// Delete everything from the map, since presence/absence in the map is used
// by stopWatching to avoid double-closing the channel.
m.watchers=map[int64]*broadcasterWatcher{}}// loop receives from m.incoming and distributes to all watchers.
func(m*Broadcaster)loop(){// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
// 先处理Type为internal-do-function的event,执行event.Object的func
// 避免这个Event被watcher消费,导致重复执行
forevent:=rangem.incoming{ifevent.Type==internalRunFunctionMarker{event.Object.(functionFakeRuntimeObject)()continue}m.distribute(event)}// 消息处理完,关闭watcher的接收通道,重置清空watchers列表
m.closeAll()m.distributing.Done()}// distribute sends event to all watchers. Blocking.
func(m*Broadcaster)distribute(eventEvent){m.lock.Lock()deferm.lock.Unlock()ifm.fullChannelBehavior==DropIfChannelFull{for_,w:=rangem.watchers{select{// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,则执行default
// 即w.result卡住或w.stopped也卡住--这个watcher的消息堵住了,那么就放弃发送给这个watcher
casew.result<-event:case<-w.stopped:default:// Don't block if the event can't be queued.
}}}else{for_,w:=rangem.watchers{// 如果w.result卡住,那么就执行下面w.stopped,如果w.stopped没有消息,那就一直等,直到某个chan不堵塞
// 即w.result卡住或w.stopped也卡住,这个watcher不会被放弃,一直等到它空闲
// 但是它会阻塞下一个watcher接收这个消息
select{casew.result<-event:case<-w.stopped:}}}}
// NewRecorder returns an EventRecorder that records events with the given event source.
func(e*eventBroadcasterImpl)NewRecorder(scheme*runtime.Scheme,sourcev1.EventSource)EventRecorder{return&recorderImpl{scheme,source,e.Broadcaster,clock.RealClock{}}}typerecorderImplstruct{scheme*runtime.Schemesourcev1.EventSource*watch.Broadcasterclockclock.Clock}func(recorder*recorderImpl)generateEvent(objectruntime.Object,annotationsmap[string]string,timestampmetav1.Time,eventtype,reason,messagestring){ref,err:=ref.GetReference(recorder.scheme,object)iferr!=nil{klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'",object,err,eventtype,reason,message)return}if!util.ValidateEventType(eventtype){klog.Errorf("Unsupported event type: '%v'",eventtype)return}event:=recorder.makeEvent(ref,annotations,eventtype,reason,message)event.Source=recorder.sourcegofunc(){// NOTE: events should be a non-blocking operation
deferutilruntime.HandleCrash()recorder.Action(watch.Added,event)}()}func(recorder*recorderImpl)Event(objectruntime.Object,eventtype,reason,messagestring){recorder.generateEvent(object,nil,metav1.Now(),eventtype,reason,message)}func(recorder*recorderImpl)Eventf(objectruntime.Object,eventtype,reason,messageFmtstring,args...interface{}){recorder.Event(object,eventtype,reason,fmt.Sprintf(messageFmt,args...))}func(recorder*recorderImpl)AnnotatedEventf(objectruntime.Object,annotationsmap[string]string,eventtype,reason,messageFmtstring,args...interface{}){recorder.generateEvent(object,annotations,metav1.Now(),eventtype,reason,fmt.Sprintf(messageFmt,args...))}func(recorder*recorderImpl)makeEvent(ref*v1.ObjectReference,annotationsmap[string]string,eventtype,reason,messagestring)*v1.Event{t:=metav1.Time{Time:recorder.clock.Now()}namespace:=ref.Namespaceifnamespace==""{namespace=metav1.NamespaceDefault}return&v1.Event{ObjectMeta:metav1.ObjectMeta{Name:fmt.Sprintf("%v.%x",ref.Name,t.UnixNano()),Namespace:namespace,Annotations:annotations,},InvolvedObject:*ref,Reason:reason,Message:message,FirstTimestamp:t,LastTimestamp:t,Count:1,Type:eventtype,}}
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func(e*eventBroadcasterImpl)StartLogging(logffunc(formatstring,args...interface{}))watch.Interface{// 新建一个watcher,读取事件进行记录
returne.StartEventWatcher(func(e*v1.Event){logf("Event(%#v): type: '%v' reason: '%v' %v",e.InvolvedObject,e.Type,e.Reason,e.Message)})}// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func(e*eventBroadcasterImpl)StartEventWatcher(eventHandlerfunc(*v1.Event))watch.Interface{// 创建并添加新的watcher
watcher:=e.Watch()// 启动goroutine 处理这个watcher的event
gofunc(){deferutilruntime.HandleCrash()forwatchEvent:=rangewatcher.ResultChan(){event,ok:=watchEvent.Object.(*v1.Event)if!ok{// This is all local, so there's no reason this should
// ever happen.
continue}eventHandler(event)}}()returnwatcher}
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func(m*Broadcaster)Watch()Interface{varw*broadcasterWatcherm.blockQueue(func(){m.lock.Lock()deferm.lock.Unlock()id:=m.nextWatcherm.nextWatcher++w=&broadcasterWatcher{result:make(chanEvent,m.watchQueueLength),stopped:make(chanstruct{}),id:id,m:m,}m.watchers[id]=w})returnw}// Execute f, blocking the incoming queue (and waiting for it to drain first).
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func(b*Broadcaster)blockQueue(ffunc()){varwgsync.WaitGroupwg.Add(1)// 等待Event发送到incoming
// 这个Event不会被watcher消费,在loop()中会单独处理这个Type的Event--执行functionFakeRuntimeObject里的func
b.incoming<-Event{Type:internalRunFunctionMarker,Object:functionFakeRuntimeObject(func(){deferwg.Done()f()}),}// 等待f()执行完成
wg.Wait()}
ifkubeDeps.EventClient!=nil{klog.V(4).Infof("Sending events to api server.")eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface:kubeDeps.EventClient.Events("")})}else{klog.Warning("No api server defined - no events will be sent to API server.")}
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func(c*EventCorrelator)EventCorrelate(newEvent*v1.Event)(*EventCorrelateResult,error){ifnewEvent==nil{returnnil,fmt.Errorf("event is nil")}aggregateEvent,ckey:=c.aggregator.EventAggregate(newEvent)observedEvent,patch,err:=c.logger.eventObserve(aggregateEvent,ckey)ifc.filterFunc(observedEvent){return&EventCorrelateResult{Skip:true},nil}return&EventCorrelateResult{Event:observedEvent,Patch:patch},err}// eventObserve records an event, or updates an existing one if key is a cache hit
func(e*eventLogger)eventObserve(newEvent*v1.Event,keystring)(*v1.Event,[]byte,error){var(patch[]byteerrerror)eventCopy:=*newEventevent:=&eventCopye.Lock()defere.Unlock()// Check if there is an existing event we should update
lastObservation:=e.lastEventObservationFromCache(key)// If we found a result, prepare a patch
iflastObservation.count>0{// update the event based on the last observation so patch will work as desired
event.Name=lastObservation.nameevent.ResourceVersion=lastObservation.resourceVersionevent.FirstTimestamp=lastObservation.firstTimestampevent.Count=int32(lastObservation.count)+1eventCopy2:=*eventeventCopy2.Count=0eventCopy2.LastTimestamp=metav1.NewTime(time.Unix(0,0))eventCopy2.Message=""newData,_:=json.Marshal(event)oldData,_:=json.Marshal(eventCopy2)patch,err=strategicpatch.CreateTwoWayMergePatch(oldData,newData,event)}// record our new observation
e.cache.Add(key,eventLog{count:uint(event.Count),firstTimestamp:event.FirstTimestamp,name:event.Name,resourceVersion:event.ResourceVersion,},)returnevent,patch,err}// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func(c*EventCorrelator)EventCorrelate(newEvent*v1.Event)(*EventCorrelateResult,error){ifnewEvent==nil{returnnil,fmt.Errorf("event is nil")}aggregateEvent,ckey:=c.aggregator.EventAggregate(newEvent)observedEvent,patch,err:=c.logger.eventObserve(aggregateEvent,ckey)ifc.filterFunc(observedEvent){return&EventCorrelateResult{Skip:true},nil}return&EventCorrelateResult{Event:observedEvent,Patch:patch},err}// Filter controls that a given source+object are not exceeding the allowed rate.
// 对相同的source和involvedObject的event进行限速
func(f*EventSourceObjectSpamFilter)Filter(event*v1.Event)bool{varrecordspamRecord// controls our cached information about this event (source+object)
eventKey:=getSpamKey(event)// do we have a record of similar events in our cache?
f.Lock()deferf.Unlock()value,found:=f.cache.Get(eventKey)iffound{record=value.(spamRecord)}// verify we have a rate limiter for this record
ifrecord.rateLimiter==nil{record.rateLimiter=flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps,f.burst,f.clock)}// ensure we have available rate
filter:=!record.rateLimiter.TryAccept()// update the cache
f.cache.Add(eventKey,record)returnfilter}// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
// the full key for normal events, and to the result of
// EventAggregatorMessageFunc for aggregate events.
func(e*EventAggregator)EventAggregate(newEvent*v1.Event)(*v1.Event,string){now:=metav1.NewTime(e.clock.Now())varrecordaggregateRecord// eventKey is the full cache key for this event
eventKey:=getEventKey(newEvent)// aggregateKey is for the aggregate event, if one is needed.
// 默认的keyFunc是EventAggregatorByReasonFunc
// EventAggregatorByReasonFunc返回的aggregateKey是拼合Source、InvolvedObject(不包含FieldPath)、Type、Reason,localKey是Message。
aggregateKey,localKey:=e.keyFunc(newEvent)// Do we have a record of similar events in our cache?
e.Lock()defere.Unlock()value,found:=e.cache.Get(aggregateKey)iffound{record=value.(aggregateRecord)}// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval:=time.Duration(e.maxIntervalInSeconds)*time.Secondinterval:=now.Time.Sub(record.lastTimestamp.Time)// 现在已经过期时间,清空记录
ifinterval>maxInterval{record=aggregateRecord{localKeys:sets.NewString()}}// Write the new event into the aggregation record and put it on the cache
// 写入localKey,这里利用map特性,如果已经存在,不会保存重复的key
record.localKeys.Insert(localKey)record.lastTimestamp=nowe.cache.Add(aggregateKey,record)// If we are not yet over the threshold for unique events, don't correlate them
// localKeys长度(不同的message的个数)小于maxEvents,包括过期或没有过期,则直接返回newEvent或eventKey
ifuint(record.localKeys.Len())<e.maxEvents{returnnewEvent,eventKey}// do not grow our local key set any larger than max
record.localKeys.PopAny()// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
// 在record没有过期且localKeys长度(不同的message的个数)大于等于maxEvents,则进行聚合--修改最后收到的event
eventCopy:=&v1.Event{ObjectMeta:metav1.ObjectMeta{Name:fmt.Sprintf("%v.%x",newEvent.InvolvedObject.Name,now.UnixNano()),Namespace:newEvent.Namespace,},Count:1,FirstTimestamp:now,InvolvedObject:newEvent.InvolvedObject,LastTimestamp:now,// 默认的messageFunc为EventAggregatorByReasonMessageFunc
// EventAggregatorByReasonMessageFunc返回"(combined from similar events): "加上message
Message:e.messageFunc(newEvent),Type:newEvent.Type,Reason:newEvent.Reason,Source:newEvent.Source,}returneventCopy,aggregateKey}
stopCh:=make(chanstruct{})eventBroadcaster:=events.NewEventBroadcasterAdapter(kubeDeps.KubeClient)kubeDeps.Recorder=eventBroadcaster.NewRecorder(componentKubelet)ifkubeDeps.EventClient!=nil{klog.V(4).InfoS("Sending events to api server")eventBroadcaster.StartRecordingToSink(stopCh)}else{klog.InfoS("No api server defined - no events will be sent to API server")}
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
// 包含了新老版本的EventBroadcaster
funcNewEventBroadcasterAdapter(clientclientset.Interface)EventBroadcasterAdapter{eventClient:=&eventBroadcasterAdapterImpl{}// 判断api是否支持新的event
if_,err:=client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String());err==nil{// 新的方式
eventClient.eventsv1Client=client.EventsV1()eventClient.eventsv1Broadcaster=NewBroadcaster(&EventSinkImpl{Interface:eventClient.eventsv1Client})}// Even though there can soon exist cases when coreBroadcaster won't really be needed,
// we create it unconditionally because its overhead is minor and will simplify using usage
// patterns of this library in all components.
// 老的方式
eventClient.coreClient=client.CoreV1()eventClient.coreBroadcaster=record.NewBroadcaster()returneventClient}// NewBroadcaster Creates a new event broadcaster.
funcNewBroadcaster(sinkEventSink)EventBroadcaster{returnnewBroadcaster(sink,defaultSleepDuration,map[eventKey]*eventsv1.Event{})}// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
funcnewBroadcaster(sinkEventSink,sleepDurationtime.Duration,eventCachemap[eventKey]*eventsv1.Event)EventBroadcaster{return&eventBroadcasterImpl{Broadcaster:watch.NewBroadcaster(maxQueuedEvents,watch.DropIfChannelFull),eventCache:eventCache,sleepDuration:sleepDuration,sink:sink,}}// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func(e*eventBroadcasterImpl)StartRecordingToSink(stopCh<-chanstruct{}){// 每30分钟执行更新已经cache中的event有series
gowait.Until(e.refreshExistingEventSeries,refreshTime,stopCh)// 每6分钟执行更新event--在cache中有6分钟之前发现的或event生成已经过了6分钟
gowait.Until(e.finishSeries,finishTime,stopCh)// 启动一个goroutine添加一个watcher,处理接收到的event
e.startRecordingEvents(stopCh)}// refreshExistingEventSeries refresh events TTL
func(e*eventBroadcasterImpl)refreshExistingEventSeries(){// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()defere.mu.Unlock()forisomorphicKey,event:=rangee.eventCache{ifevent.Series!=nil{ifrecordedEvent,retry:=recordEvent(e.sink,event);!retry{ifrecordedEvent!=nil{e.eventCache[isomorphicKey]=recordedEvent}}}}}// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func(e*eventBroadcasterImpl)finishSeries(){// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()defere.mu.Unlock()forisomorphicKey,event:=rangee.eventCache{eventSerie:=event.SeriesifeventSerie!=nil{ifeventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)){if_,retry:=recordEvent(e.sink,event);!retry{delete(e.eventCache,isomorphicKey)}}}elseifevent.EventTime.Time.Before(time.Now().Add(-finishTime)){delete(e.eventCache,isomorphicKey)}}}// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func(e*eventBroadcasterImpl)StartEventWatcher(eventHandlerfunc(eventruntime.Object))func(){watcher:=e.Watch()gofunc(){deferutilruntime.HandleCrash()for{watchEvent,ok:=<-watcher.ResultChan()if!ok{return}eventHandler(watchEvent.Object)}}()returnwatcher.Stop}func(e*eventBroadcasterImpl)startRecordingEvents(stopCh<-chanstruct{}){eventHandler:=func(objruntime.Object){event,ok:=obj.(*eventsv1.Event)if!ok{klog.Errorf("unexpected type, expected eventsv1.Event")return}e.recordToSink(event,clock.RealClock{})}stopWatcher:=e.StartEventWatcher(eventHandler)gofunc(){<-stopChstopWatcher()}()}func(e*eventBroadcasterImpl)recordToSink(event*eventsv1.Event,clockclock.Clock){// Make a copy before modification, because there could be multiple listeners.
eventCopy:=event.DeepCopy()gofunc(){evToRecord:=func()*eventsv1.Event{e.mu.Lock()defere.mu.Unlock()eventKey:=getKey(eventCopy)isomorphicEvent,isIsomorphic:=e.eventCache[eventKey]ifisIsomorphic{ifisomorphicEvent.Series!=nil{isomorphicEvent.Series.Count++isomorphicEvent.Series.LastObservedTime=metav1.MicroTime{Time:clock.Now()}returnnil}isomorphicEvent.Series=&eventsv1.EventSeries{Count:1,LastObservedTime:metav1.MicroTime{Time:clock.Now()},}returnisomorphicEvent}e.eventCache[eventKey]=eventCopyreturneventCopy}()ifevToRecord!=nil{recordedEvent:=e.attemptRecording(evToRecord)ifrecordedEvent!=nil{recordedEventKey:=getKey(recordedEvent)e.mu.Lock()defere.mu.Unlock()e.eventCache[recordedEventKey]=recordedEvent}}}()}
func(e*eventBroadcasterAdapterImpl)NewRecorder(namestring)EventRecorder{ife.eventsv1Broadcaster!=nil&&e.eventsv1Client!=nil{returne.eventsv1Broadcaster.NewRecorder(scheme.Scheme,name)}returnrecord.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))}// NewRecorder returns an EventRecorder that records events with the given event source.
func(e*eventBroadcasterImpl)NewRecorder(scheme*runtime.Scheme,reportingControllerstring)EventRecorder{hostname,_:=os.Hostname()reportingInstance:=reportingController+"-"+hostnamereturn&recorderImpl{scheme,reportingController,reportingInstance,e.Broadcaster,clock.RealClock{}}}
func(recorder*recorderImpl)Eventf(regardingruntime.Object,relatedruntime.Object,eventtype,reason,action,notestring,args...interface{}){timestamp:=metav1.MicroTime{time.Now()}message:=fmt.Sprintf(note,args...)refRegarding,err:=reference.GetReference(recorder.scheme,regarding)iferr!=nil{klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'",regarding,err,eventtype,reason,message)return}refRelated,err:=reference.GetReference(recorder.scheme,related)iferr!=nil{klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.",related,err)}if!util.ValidateEventType(eventtype){klog.Errorf("Unsupported event type: '%v'",eventtype)return}event:=recorder.makeEvent(refRegarding,refRelated,timestamp,eventtype,reason,message,recorder.reportingController,recorder.reportingInstance,action)gofunc(){deferutilruntime.HandleCrash()recorder.Action(watch.Added,event)}()}func(recorder*recorderImpl)makeEvent(refRegarding*v1.ObjectReference,refRelated*v1.ObjectReference,timestampmetav1.MicroTime,eventtype,reason,messagestring,reportingControllerstring,reportingInstancestring,actionstring)*eventsv1.Event{t:=metav1.Time{Time:recorder.clock.Now()}namespace:=refRegarding.Namespaceifnamespace==""{namespace=metav1.NamespaceDefault}return&eventsv1.Event{ObjectMeta:metav1.ObjectMeta{Name:fmt.Sprintf("%v.%x",refRegarding.Name,t.UnixNano()),Namespace:namespace,},EventTime:timestamp,Series:nil,ReportingController:reportingController,ReportingInstance:reportingInstance,Action:action,Reason:reason,Regarding:*refRegarding,Related:refRelated,Note:message,Type:eventtype,}}
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
funcNewEventRecorderAdapter(recorderEventRecorder)*EventRecorderAdapter{return&EventRecorderAdapter{recorder:recorder,}}// Eventf is a wrapper around v1 Eventf
func(a*EventRecorderAdapter)Eventf(regarding,_runtime.Object,eventtype,reason,action,notestring,args...interface{}){a.recorder.Eventf(regarding,eventtype,reason,note,args...)}