typeControllerstruct{.........//周期性主动扫描到的node存在这里,用于对比新增加的node、删除的node
knownNodeSetmap[string]*v1.Node// per Node map storing last observed health together with a local time when it was observed.
//周期性扫描node,从shareinformer获取node status保存在这里
nodeHealthMap*nodeHealthMap// evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
// TODO(#83954): API calls shouldn't be executed under the lock.
evictorLocksync.Mutex// workers that evicts pods from unresponsive nodes.
//未启用taints manager时使用,存放node上pod是否已经执行驱逐的状态, 从这读取node eviction的状态是evicted、tobeeviced
nodeEvictionMap*nodeEvictionMap//未启用taints manager时使用, zone的需要pod evictor的node列表
zonePodEvictormap[string]*scheduler.RateLimitedTimedQueue// workers that are responsible for tainting nodes.
//启用taints manage时使用,存放需要更新taint的unready node列表--令牌桶队列
zoneNoExecuteTaintermap[string]*scheduler.RateLimitedTimedQueue//存放每个zone的健康状态,有stateFullDisruption、statePartialDisruption、stateNormal、stateInitial
zoneStatesmap[string]ZoneState// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
// nodeMonitorGracePeriod.
// TODO: Change node health monitor to watch based.
// 主动扫描所有node的周期
nodeMonitorPeriodtime.Duration// When node is just created, e.g. cluster bootstrap or node creation, we give
// a longer grace period.
// node刚注册时候的,认为node unready的超时时间
nodeStartupGracePeriodtime.Duration// Controller will not proactively sync node health, but will monitor node
// health signal updated from kubelet. There are 2 kinds of node healthiness
// signals: NodeStatus and NodeLease. NodeLease signal is generated only when
// NodeLease feature is enabled. If it doesn't receive update for this amount
// of time, it will start posting "NodeReady==ConditionUnknown". The amount of
// time before which Controller start evicting pods is controlled via flag
// 'pod-eviction-timeout'.
// Note: be cautious when changing the constant, it must work with
// nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
// controller. The node health signal update frequency is the minimal of the
// two.
// There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than the node health signal
// update frequency, where N means number of retries allowed for kubelet to
// post node status/lease. It is pointless to make nodeMonitorGracePeriod
// be less than the node health signal update frequency, since there will
// only be fresh values from Kubelet at an interval of node health signal
// update frequency. The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger
// value takes longer for user to see up-to-date node health.
// node的不更新status或者lease的持续时间,超过这个时间会将node的ready condition改为unknown
nodeMonitorGracePeriodtime.Duration// node unready之后多久执行驱逐node上的pod
podEvictionTimeouttime.Duration// zone正常时候的 每秒多少个node去执行驱逐/添加taint
evictionLimiterQPSfloat32// zone为statePartialDisruption时候且节点数量大于largeClusterThreshold 每秒多少个node去执行驱逐/添加taint
secondaryEvictionLimiterQPSfloat32// 多少个节点数认为大集群, 这个数值用来判断是否在zone为statePartialDisruption时候,将每秒多少个node去执行驱逐/添加taint设置为0
largeClusterThresholdint32// unready node超出多少比例,认为zone是statePartialDisruption
unhealthyZoneThresholdfloat32// if set to true Controller will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManagerbool// 不限速的workqueue
nodeUpdateQueueworkqueue.Interface// 具有限速和指数回退策略的workqueue
podUpdateQueueworkqueue.RateLimitingInterface}
// NewNodeLifecycleController returns a new taint controller.
funcNewNodeLifecycleController(leaseInformercoordinformers.LeaseInformer,podInformercoreinformers.PodInformer,nodeInformercoreinformers.NodeInformer,daemonSetInformerappsv1informers.DaemonSetInformer,kubeClientclientset.Interface,nodeMonitorPeriodtime.Duration,nodeStartupGracePeriodtime.Duration,nodeMonitorGracePeriodtime.Duration,podEvictionTimeouttime.Duration,evictionLimiterQPSfloat32,secondaryEvictionLimiterQPSfloat32,largeClusterThresholdint32,unhealthyZoneThresholdfloat32,runTaintManagerbool,)(*Controller,error){ifkubeClient==nil{klog.Fatalf("kubeClient is nil when starting Controller")}//初始化event recorder
eventBroadcaster:=record.NewBroadcaster()recorder:=eventBroadcaster.NewRecorder(scheme.Scheme,v1.EventSource{Component:"node-controller"})eventBroadcaster.StartLogging(klog.Infof)klog.Infof("Sending events to api server.")eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface:v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),})ifkubeClient.CoreV1().RESTClient().GetRateLimiter()!=nil{ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller",kubeClient.CoreV1().RESTClient().GetRateLimiter())}nc:=&Controller{kubeClient:kubeClient,now:metav1.Now,knownNodeSet:make(map[string]*v1.Node),nodeHealthMap:newNodeHealthMap(),//存放发现的node的health数据
nodeEvictionMap:newNodeEvictionMap(),recorder:recorder,nodeMonitorPeriod:nodeMonitorPeriod,//默认为5s
nodeStartupGracePeriod:nodeStartupGracePeriod,//默认一分钟
nodeMonitorGracePeriod:nodeMonitorGracePeriod,//默认40s
zonePodEvictor:make(map[string]*scheduler.RateLimitedTimedQueue),//未启用taints manage, zone里需要pod evicted的node列表,令牌桶速度限制队列
zoneNoExecuteTainter:make(map[string]*scheduler.RateLimitedTimedQueue),//启用taints manage,需要更新taint的unhealy node列表,令牌桶速度限制队列
nodesToRetry:sync.Map{},zoneStates:make(map[string]ZoneState),//存放每个zone的状态
podEvictionTimeout:podEvictionTimeout,//默认为5分钟
evictionLimiterQPS:evictionLimiterQPS,//默认为0.1
secondaryEvictionLimiterQPS:secondaryEvictionLimiterQPS,//默认为0.01
largeClusterThreshold:largeClusterThreshold,//默认为50
unhealthyZoneThreshold:unhealthyZoneThreshold,//默认为0.55
runTaintManager:runTaintManager,//默认为true
nodeUpdateQueue:workqueue.NewNamed("node_lifecycle_controller"),//node变更时候,会加入队列,然后进行taint添加、更新
podUpdateQueue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"node_lifecycle_controller_pods"),//有队列和限速,pod变更时候,会加入队列,用来将pod进行驱逐或者pod condition 改为ready false
}nc.enterPartialDisruptionFunc=nc.ReducedQPSFuncnc.enterFullDisruptionFunc=nc.HealthyQPSFuncnc.computeZoneStateFunc=nc.ComputeZoneStatepodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){pod:=obj.(*v1.Pod)nc.podUpdated(nil,pod)ifnc.taintManager!=nil{nc.taintManager.PodUpdated(nil,pod)}},UpdateFunc:func(prev,objinterface{}){prevPod:=prev.(*v1.Pod)newPod:=obj.(*v1.Pod)nc.podUpdated(prevPod,newPod)ifnc.taintManager!=nil{nc.taintManager.PodUpdated(prevPod,newPod)}},DeleteFunc:func(objinterface{}){pod,isPod:=obj.(*v1.Pod)// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
if!isPod{deletedState,ok:=obj.(cache.DeletedFinalStateUnknown)if!ok{klog.Errorf("Received unexpected object: %v",obj)return}pod,ok=deletedState.Obj.(*v1.Pod)if!ok{klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v",deletedState.Obj)return}}nc.podUpdated(pod,nil)ifnc.taintManager!=nil{nc.taintManager.PodUpdated(pod,nil)}},})nc.podInformerSynced=podInformer.Informer().HasSynced//sharedinformer中添加新的indexers
podInformer.Informer().AddIndexers(cache.Indexers{nodeNameKeyIndex:func(objinterface{})([]string,error){pod,ok:=obj.(*v1.Pod)if!ok{return[]string{},nil}iflen(pod.Spec.NodeName)==0{return[]string{},nil}return[]string{pod.Spec.NodeName},nil},})podIndexer:=podInformer.Informer().GetIndexer()nc.getPodsAssignedToNode=func(nodeNamestring)([]*v1.Pod,error){objs,err:=podIndexer.ByIndex(nodeNameKeyIndex,nodeName)iferr!=nil{returnnil,err}pods:=make([]*v1.Pod,0,len(objs))for_,obj:=rangeobjs{pod,ok:=obj.(*v1.Pod)if!ok{continue}pods=append(pods,pod)}returnpods,nil}nc.podLister=podInformer.Lister()ifnc.runTaintManager{podGetter:=func(name,namespacestring)(*v1.Pod,error){returnnc.podLister.Pods(namespace).Get(name)}nodeLister:=nodeInformer.Lister()nodeGetter:=func(namestring)(*v1.Node,error){returnnodeLister.Get(name)}nc.taintManager=scheduler.NewNoExecuteTaintManager(kubeClient,podGetter,nodeGetter,nc.getPodsAssignedToNode)nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:nodeutil.CreateAddNodeHandler(func(node*v1.Node)error{nc.taintManager.NodeUpdated(nil,node)returnnil}),UpdateFunc:nodeutil.CreateUpdateNodeHandler(func(oldNode,newNode*v1.Node)error{nc.taintManager.NodeUpdated(oldNode,newNode)returnnil}),DeleteFunc:nodeutil.CreateDeleteNodeHandler(func(node*v1.Node)error{nc.taintManager.NodeUpdated(node,nil)returnnil}),})}klog.Infof("Controller will reconcile labels.")nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:nodeutil.CreateAddNodeHandler(func(node*v1.Node)error{nc.nodeUpdateQueue.Add(node.Name)nc.nodeEvictionMap.registerNode(node.Name)returnnil}),UpdateFunc:nodeutil.CreateUpdateNodeHandler(func(_,newNode*v1.Node)error{nc.nodeUpdateQueue.Add(newNode.Name)returnnil}),DeleteFunc:nodeutil.CreateDeleteNodeHandler(func(node*v1.Node)error{nc.nodesToRetry.Delete(node.Name)nc.nodeEvictionMap.unregisterNode(node.Name)returnnil}),})nc.leaseLister=leaseInformer.Lister()nc.leaseInformerSynced=leaseInformer.Informer().HasSyncednc.nodeLister=nodeInformer.Lister()nc.nodeInformerSynced=nodeInformer.Informer().HasSyncednc.daemonSetStore=daemonSetInformer.Lister()nc.daemonSetInformerSynced=daemonSetInformer.Informer().HasSyncedreturnnc,nil}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func(nc*Controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()klog.Infof("Starting node controller")deferklog.Infof("Shutting down node controller")if!cache.WaitForNamedCacheSync("taint",stopCh,nc.leaseInformerSynced,nc.nodeInformerSynced,nc.podInformerSynced,nc.daemonSetInformerSynced){return}//pod是否能够toleration上的tains,不能就进行驱逐pod
ifnc.runTaintManager{gonc.taintManager.Run(stopCh)}// Close node update queue to cleanup go routine.
defernc.nodeUpdateQueue.ShutDown()defernc.podUpdateQueue.ShutDown()// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
fori:=0;i<scheduler.UpdateWorkerSize;i++{// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
gowait.Until(nc.doNodeProcessingPassWorker,time.Second,stopCh)}fori:=0;i<podUpdateWorkerSize;i++{gowait.Until(nc.doPodProcessingWorker,time.Second,stopCh)}ifnc.runTaintManager{// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
gowait.Until(nc.doNoExecuteTaintingPass,scheduler.NodeEvictionPeriod,stopCh)}else{// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
gowait.Until(nc.doEvictionPass,scheduler.NodeEvictionPeriod,stopCh)}// Incorporate the results of node health signal pushed from kubelet to master.
gowait.Until(func(){iferr:=nc.monitorNodeHealth();err!=nil{klog.Errorf("Error monitoring node health: %v",err)}},nc.nodeMonitorPeriod,stopCh)<-stopCh}