// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
funcNewNoExecuteTaintManager(cclientset.Interface,getPodGetPodFunc,getNodeGetNodeFunc,getPodsAssignedToNodeGetPodsByNodeNameFunc)*NoExecuteTaintManager{eventBroadcaster:=record.NewBroadcaster()recorder:=eventBroadcaster.NewRecorder(scheme.Scheme,v1.EventSource{Component:"taint-controller"})eventBroadcaster.StartLogging(klog.Infof)ifc!=nil{klog.V(0).Infof("Sending events to api server.")eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface:c.CoreV1().Events("")})}else{klog.Fatalf("kubeClient is nil when starting NodeController")}tm:=&NoExecuteTaintManager{client:c,recorder:recorder,getPod:getPod,getNode:getNode,getPodsAssignedToNode:getPodsAssignedToNode,taintedNodes:make(map[string][]v1.Taint),nodeUpdateQueue:workqueue.NewNamed("noexec_taint_node"),podUpdateQueue:workqueue.NewNamed("noexec_taint_pod"),}tm.taintEvictionQueue=CreateWorkerQueue(deletePodHandler(c,tm.emitPodDeletionEvent))returntm}
func(tc*NoExecuteTaintManager)emitPodDeletionEvent(nsNametypes.NamespacedName){iftc.recorder==nil{return}ref:=&v1.ObjectReference{Kind:"Pod",Name:nsName.Name,Namespace:nsName.Namespace,}tc.recorder.Eventf(ref,v1.EventTypeNormal,"TaintManagerEviction","Marking for deletion Pod %s",nsName.String())}funcdeletePodHandler(cclientset.Interface,emitEventFuncfunc(types.NamespacedName))func(args*WorkArgs)error{returnfunc(args*WorkArgs)error{ns:=args.NamespacedName.Namespacename:=args.NamespacedName.Nameklog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v",args.NamespacedName.String())ifemitEventFunc!=nil{emitEventFunc(args.NamespacedName)}varerrerrorfori:=0;i<retries;i++{err=c.CoreV1().Pods(ns).Delete(context.TODO(),name,metav1.DeleteOptions{})iferr==nil{break}time.Sleep(10*time.Millisecond)}returnerr}}// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
// given function `f`.
funcCreateWorkerQueue(ffunc(args*WorkArgs)error)*TimedWorkerQueue{return&TimedWorkerQueue{workers:make(map[string]*TimedWorker),workFunc:f,}}
// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
typeNoExecuteTaintManagerstruct{clientclientset.Interfacerecorderrecord.EventRecorder// 从informer中获取pod的func
getPodGetPodFunc// 从informer中获取pod的func
getNodeGetNodeFunc从informer中获取node上的pod的funcgetPodsAssignedToNodeGetPodsByNodeNameFunctaintEvictionQueue*TimedWorkerQueue// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLocksync.MutextaintedNodesmap[string][]v1.Taint// 从nodeUpdateQueue获取的node,会放到这里
nodeUpdateChannels[]channodeUpdateItem// 从podUpdateQueue获取的pod,会放到这里
podUpdateChannels[]chanpodUpdateItemnodeUpdateQueueworkqueue.InterfacepodUpdateQueueworkqueue.Interface}
// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
typeTimedWorkerQueuestruct{sync.Mutex// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
workersmap[string]*TimedWorker// 到期要执行的函数
workFuncfunc(args*WorkArgs)error}// TimedWorker is a responsible for executing a function no earlier than at FireAt time.
typeTimedWorkerstruct{WorkItem*WorkArgsCreatedAttime.TimeFireAttime.TimeTimer*time.Timer}// WorkArgs keeps arguments that will be passed to the function executed by the worker.
typeWorkArgsstruct{NamespacedNametypes.NamespacedName}
// NamespacedName comprises a resource name, with a mandatory namespace,
// rendered as "<namespace>/<name>". Being a type captures intent and
// helps make sure that UIDs, namespaced names and non-namespaced names
// do not get conflated in code. For most use cases, namespace and name
// will already have been format validated at the API entry point, so we
// don't do that here. Where that's not the case (e.g. in testing),
// consider using NamespacedNameOrDie() in testing.go in this package.
typeNamespacedNamestruct{NamespacestringNamestring}
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func(tc*NoExecuteTaintManager)Run(stopCh<-chanstruct{}){klog.V(0).Infof("Starting NoExecuteTaintManager")fori:=0;i<UpdateWorkerSize;i++{tc.nodeUpdateChannels=append(tc.nodeUpdateChannels,make(channodeUpdateItem,NodeUpdateChannelSize))tc.podUpdateChannels=append(tc.podUpdateChannels,make(chanpodUpdateItem,podUpdateChannelSize))}// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
gofunc(stopCh<-chanstruct{}){for{item,shutdown:=tc.nodeUpdateQueue.Get()ifshutdown{break}nodeUpdate:=item.(nodeUpdateItem)hash:=hash(nodeUpdate.nodeName,UpdateWorkerSize)select{case<-stopCh:tc.nodeUpdateQueue.Done(item)returncasetc.nodeUpdateChannels[hash]<-nodeUpdate:// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}}}(stopCh)gofunc(stopCh<-chanstruct{}){for{item,shutdown:=tc.podUpdateQueue.Get()ifshutdown{break}// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
podUpdate:=item.(podUpdateItem)hash:=hash(podUpdate.nodeName,UpdateWorkerSize)select{case<-stopCh:tc.podUpdateQueue.Done(item)returncasetc.podUpdateChannels[hash]<-podUpdate:// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}}}(stopCh)wg:=sync.WaitGroup{}wg.Add(UpdateWorkerSize)fori:=0;i<UpdateWorkerSize;i++{gotc.worker(i,wg.Done,stopCh)}wg.Wait()}
func(tc*NoExecuteTaintManager)worker(workerint,donefunc(),stopCh<-chanstruct{}){deferdone()// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for{select{case<-stopCh:returncasenodeUpdate:=<-tc.nodeUpdateChannels[worker]:tc.handleNodeUpdate(nodeUpdate)tc.nodeUpdateQueue.Done(nodeUpdate)casepodUpdate:=<-tc.podUpdateChannels[worker]:// If we found a Pod update we need to empty Node queue first.
priority:for{select{casenodeUpdate:=<-tc.nodeUpdateChannels[worker]:tc.handleNodeUpdate(nodeUpdate)tc.nodeUpdateQueue.Done(nodeUpdate)default:breakpriority}}// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)tc.podUpdateQueue.Done(podUpdate)}}}
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func(q*TimedWorkerQueue)AddWork(args*WorkArgs,createdAttime.Time,fireAttime.Time){key:=args.KeyFromWorkArgs()klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v",key,createdAt,fireAt)q.Lock()deferq.Unlock()if_,exists:=q.workers[key];exists{klog.Warningf("Trying to add already existing work for %+v. Skipping.",args)return}worker:=CreateWorker(args,createdAt,fireAt,q.getWrappedWorkerFunc(key))q.workers[key]=worker}
// 定义在pkg\controller\nodelifecycle\scheduler\timed_workers.go
// KeyFromWorkArgs creates a key for the given `WorkArgs`
func(w*WorkArgs)KeyFromWorkArgs()string{returnw.NamespacedName.String()}// 定义在staging\src\k8s.io\apimachinery\pkg\types\namespacedname.go
const(Separator='/')// String returns the general purpose string representation
func(nNamespacedName)String()string{returnfmt.Sprintf("%s%c%s",n.Namespace,Separator,n.Name)}
func(q*TimedWorkerQueue)getWrappedWorkerFunc(keystring)func(args*WorkArgs)error{returnfunc(args*WorkArgs)error{err:=q.workFunc(args)q.Lock()deferq.Unlock()iferr==nil{// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key]=nil}else{delete(q.workers,key)}returnerr}}
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
funcCreateWorker(args*WorkArgs,createdAttime.Time,fireAttime.Time,ffunc(args*WorkArgs)error)*TimedWorker{delay:=fireAt.Sub(createdAt)ifdelay<=0{gof(args)returnnil}timer:=time.AfterFunc(delay,func(){f(args)})return&TimedWorker{WorkItem:args,CreatedAt:createdAt,FireAt:fireAt,Timer:timer,}}
// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
func(q*TimedWorkerQueue)CancelWork(keystring)bool{q.Lock()deferq.Unlock()worker,found:=q.workers[key]result:=falseiffound{klog.V(4).Infof("Cancelling TimedWorkerQueue item %v at %v",key,time.Now())ifworker!=nil{result=trueworker.Cancel()}delete(q.workers,key)}returnresult}// Cancel cancels the execution of function by the `TimedWorker`
func(w*TimedWorker)Cancel(){ifw!=nil{w.Timer.Stop()}}
//执行驱逐node上面的pod--不能容忍node的tains
func(tc*NoExecuteTaintManager)handleNodeUpdate(nodeUpdatenodeUpdateItem){node,err:=tc.getNode(nodeUpdate.nodeName)iferr!=nil{ifapierrors.IsNotFound(err){// Delete
klog.V(4).Infof("Noticed node deletion: %#v",nodeUpdate.nodeName)tc.taintedNodesLock.Lock()defertc.taintedNodesLock.Unlock()delete(tc.taintedNodes,nodeUpdate.nodeName)return}utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v",nodeUpdate.nodeName,err))return}// Create or Update
klog.V(4).Infof("Noticed node update: %#v",nodeUpdate)taints:=getNoExecuteTaints(node.Spec.Taints)func(){tc.taintedNodesLock.Lock()defertc.taintedNodesLock.Unlock()klog.V(4).Infof("Updating known taints on node %v: %v",node.Name,taints)iflen(taints)==0{delete(tc.taintedNodes,node.Name)}else{tc.taintedNodes[node.Name]=taints}}()// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods,err:=tc.getPodsAssignedToNode(node.Name)iferr!=nil{klog.Errorf(err.Error())return}iflen(pods)==0{return}// Short circuit, to make this controller a bit faster.
iflen(taints)==0{klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...",node.Name)fori:=rangepods{tc.cancelWorkWithEvent(types.NamespacedName{Namespace:pods[i].Namespace,Name:pods[i].Name})}return}now:=time.Now()for_,pod:=rangepods{podNamespacedName:=types.NamespacedName{Namespace:pod.Namespace,Name:pod.Name}tc.processPodOnNode(podNamespacedName,node.Name,pod.Spec.Tolerations,taints,now)}}
func(tc*NoExecuteTaintManager)cancelWorkWithEvent(nsNametypes.NamespacedName){iftc.taintEvictionQueue.CancelWork(nsName.String()){tc.emitCancelPodDeletionEvent(nsName)}}func(tc*NoExecuteTaintManager)emitCancelPodDeletionEvent(nsNametypes.NamespacedName){iftc.recorder==nil{return}ref:=&v1.ObjectReference{Kind:"Pod",Name:nsName.Name,Namespace:nsName.Namespace,}tc.recorder.Eventf(ref,v1.EventTypeNormal,"TaintManagerEviction","Cancelling deletion of Pod %s",nsName.String())}
func(tc*NoExecuteTaintManager)processPodOnNode(podNamespacedNametypes.NamespacedName,nodeNamestring,tolerations[]v1.Toleration,taints[]v1.Taint,nowtime.Time,){iflen(taints)==0{tc.cancelWorkWithEvent(podNamespacedName)}allTolerated,usedTolerations:=v1helper.GetMatchingTolerations(taints,tolerations)if!allTolerated{klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v",podNamespacedName.String(),nodeName)// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
// 先在taintEvictionQueue移除该podNamespacedName,因为AddWork中会判断如果在队列中,不能再添加了,我们要重置create time, fired time。
tc.cancelWorkWithEvent(podNamespacedName)//taintEvictionQueue里面包含了定时器,定时器触发执行处理函数,所以不需要其他func来处理这个队列
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name,podNamespacedName.Namespace),time.Now(),time.Now())return}minTolerationTime:=getMinTolerationTime(usedTolerations)// getMinTolerationTime returns negative value to denote infinite toleration.
ifminTolerationTime<0{klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.",podNamespacedName.String())return}startTime:=nowtriggerTime:=startTime.Add(minTolerationTime)scheduledEviction:=tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())ifscheduledEviction!=nil{startTime=scheduledEviction.CreatedAt//startTime在现在之前,代表work在过去创建的,之前触发驱逐, 合法不做任何操作,保留原来的work
ifstartTime.Add(minTolerationTime).Before(triggerTime){return}//work创建时间在未来/现在,取消原来的work,新建一个work
tc.cancelWorkWithEvent(podNamespacedName)}tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name,podNamespacedName.Namespace),startTime,triggerTime)}