The podConfig within the kubelet is used to aggregate information from multiple sources related to pods. It processes and aggregates information from various sources of pods (deduplication, analysis of changes in pods). If there are changes in pods from these sources, podConfig updates and aggregates the changes, generating various types of PodUpdate messages and sending them to the PodUpdate channel.
file: Reads pod information from files, which are static pods.
api: Retrieves all pods on this node from the API server.
http: Retrieves pod or pod list information via HTTP, which are static pods.
podConfig
podConfig includes podStorage, mux, and channels for receiving podUpdate messages.
The podUpdate contains a list of pods, the operation type, and the source’s name.
The podStorage keeps all the pods from all sources at a given moment.
The mux is responsible for aggregating pods from all sources, analyzing changes in all pods, and composing various podUpdate messages, which are then sent to the channel for receiving podUpdate messages.
Within podConfig, the aggregator consumes PodUpdate messages through the source channel. For each source, an associated goroutine is started to consume messages from the source channel. These goroutines analyze the messages and generate various PodUpdate messages, which are ultimately sent to the updates channel.
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
typePodConfigstruct{pods*podStorage// 聚合器
mux*config.Mux// the channel of denormalized changes passed to listeners
updateschankubetypes.PodUpdate// contains the list of all configured sources
sourcesLocksync.Mutexsourcessets.StringcheckpointManagercheckpointmanager.CheckpointManager}// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
typepodStoragestruct{podLocksync.RWMutex// map of source name to pod uid to pod reference
podsmap[string]map[types.UID]*v1.PodmodePodConfigNotificationMode// ensures that updates are delivered in strict order
// on the updates channel
updateLocksync.Mutexupdateschan<-kubetypes.PodUpdate// contains the set of all sources that have sent at least one SET
sourcesSeenLocksync.RWMutexsourcesSeensets.String// the EventRecorder to use
recorderrecord.EventRecorder}
podUpdate definition
pkg\kubelet\types\pod_update.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
typePodUpdatestruct{Pods[]*v1.PodOpPodOperationSourcestring}
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
funcmakePodSourceConfig(kubeCfg*kubeletconfiginternal.KubeletConfiguration,kubeDeps*Dependencies,nodeNametypes.NodeName,bootstrapCheckpointPathstring)(*config.PodConfig,error){manifestURLHeader:=make(http.Header)iflen(kubeCfg.StaticPodURLHeader)>0{fork,v:=rangekubeCfg.StaticPodURLHeader{fori:=rangev{manifestURLHeader.Add(k,v[i])}}}// source of all configuration
cfg:=config.NewPodConfig(config.PodConfigNotificationIncremental,kubeDeps.Recorder)
1.1 Creating a PodConfig
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
funcNewPodConfig(modePodConfigNotificationMode,recorderrecord.EventRecorder)*PodConfig{updates:=make(chankubetypes.PodUpdate,50)storage:=newPodStorage(updates,mode,recorder)podConfig:=&PodConfig{pods:storage,mux:config.NewMux(storage),updates:updates,sources:sets.String{},}returnpodConfig}
podStorage stores all pods from different sources at a specific time and sends changes related to pods to the updates channel.
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
funcnewPodStorage(updateschan<-kubetypes.PodUpdate,modePodConfigNotificationMode,recorderrecord.EventRecorder)*podStorage{return&podStorage{pods:make(map[string]map[types.UID]*v1.Pod),mode:mode,updates:updates,sourcesSeen:sets.String{},recorder:recorder,}}
The mux (multiplexer) is responsible for aggregating pods from various sources and analyzing changes.
typeMergerinterface{// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(sourcestring,updateinterface{})error}// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
typeMuxstruct{// Invoked when an update is sent to a source.
mergerMerger// Sources and their lock.
sourceLocksync.RWMutex// Maps source names to channels
sourcesmap[string]chaninterface{}}// NewMux creates a new mux that can merge changes from multiple sources.
funcNewMux(mergerMerger)*Mux{mux:=&Mux{sources:make(map[string]chaninterface{}),merger:merger,}returnmux}
2 Adding a Source
To add a source to podConfig, you can call the Channel function, which returns a channel specific to that source. This channel is used to receive PodUpdate messages generated by the source, and it is referred to as the “source channel.”
The Channel function of the multiplexer (mux) is invoked. If a channel for the source already exists, it returns that channel. Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel. The goroutine processes messages (categorizing them) and sends the final PodUpdate messages to the merger (which is the PodStorage’s updates channel).
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
//
// If a channel for the source already exists, it returns that channel.
// Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel.
// The goroutine processes messages (categorizing them) and sends the final PodUpdate messages to the merger (which is the PodStorage's updates channel).
func(m*Mux)Channel(sourcestring)chaninterface{}{iflen(source)==0{panic("Channel given an empty name")}m.sourceLock.Lock()deferm.sourceLock.Unlock()channel,exists:=m.sources[source]ifexists{returnchannel}newChannel:=make(chaninterface{})m.sources[source]=newChannelgowait.Until(func(){m.listen(source,newChannel)},0,wait.NeverStop)returnnewChannel}
The aggregator (goroutine) within the multiplexer consumes messages from this channel, processes them (categorizes them), and aggregates them. It then calls podStorage’s Merge function to perform aggregation.
pkg\util\config\config.go
1
2
3
4
5
6
func(m*Mux)listen(sourcestring,listenChannel<-chaninterface{}){forupdate:=rangelistenChannel{// The merger here is the podStorage
m.merger.Merge(source,update)}}
podStorage Merge
If the Op is ADD, UPDATE, or DELETE, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates the pods in podStorage.
If the Op is REMOVE, the pod list in the PodUpdate message represents removePods.
If the Op is SET, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in the PodUpdate message) replaces the pods in podStorage, and a list of removed pods is calculated.
If the Op is RESTORE, the pod list in the PodUpdate message represents restorePods.
Based on the analysis results, the corresponding PodUpdate messages with different Ops are generated.
Depending on the mode, different PodUpdate messages are sent to the updates channel:
If the mode is PodConfigNotificationIncremental and there are changes in add, update, delete, remove, restore, or reconcile pods, corresponding ADD, UPDATE, DELETE, REMOVE, RESTORE, or RECONCILE PodUpdate events are sent. If a new source is added and there are no add, update, or delete pods, an empty ADD PodUpdate event is sent to mark the source as ready. This mode is used by kubelet.
If the mode is PodConfigNotificationSnapshotAndUpdates, SET PodUpdate events are sent when remove or add pods or a new source is added. UPDATE and DELETE PodUpdate events are sent when there are updates or deletes of pods.
If the mode is PodConfigNotificationSnapshot, SET PodUpdate events are sent when there are updates, deletes, adds, removes, or a new source is added.
The algorithm for processing PodUpdate messages is responsible for analyzing the operations and generating corresponding PodUpdate messages based on the changes in pods.
If the Op is ADD, UPDATE, or DELETE, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates the pods in podStorage.
If the Op is REMOVE, the pod list in the PodUpdate message represents removePods.
If the Op is SET, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in the PodUpdate message) replaces the pods in podStorage, and a list of removed pods is calculated.
If the Op is RESTORE, the pod list in the PodUpdate message represents restorePods.
Based on the analysis results, the corresponding Op-specific PodUpdate messages are generated.
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
//
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func(s*podStorage)Merge(sourcestring,changeinterface{})error{s.updateLock.Lock()defers.updateLock.Unlock()seenBefore:=s.sourcesSeen.Has(source)adds,updates,deletes,removes,reconciles,restores:=s.merge(source,change)......
3 file source
The provided information describes the process of reading pod configurations from files, storing them in the UndeltaStore, and listening for file changes. The UndeltaStore is responsible for generating PodUpdate messages with the operation (Op) set to SET, which includes all pods, and sending these PodUpdate messages to the file source’s receiving channel.
The UndeltaStore is a thread-safe storage that contains all pods from the source. It includes methods such as Add (adding pods to UndeltaStore), Update (updating pods in UndeltaStore), Replace (replacing pods in UndeltaStore), and Delete (deleting pods from UndeltaStore). Regardless of which method is executed, it will also execute PushFunc, which packages the pod list into a PodUpdate message with the operation set to SET and sends it to the source’s receiving channel. The UndeltaStore is defined in /staging/src/k8s.io/client-go/tools/cache/undelta_store.go.
// define file config source
// 默认StaticPodPath为空,FileCheckFrequency为20s
ifkubeCfg.StaticPodPath!=""{klog.Infof("Adding pod path: %v",kubeCfg.StaticPodPath)// Start a goroutine to watch file changes and periodically read files, generating PodUpdate messages (with Op set to SET, including all pods) and sending them to the updates channel
config.NewSourceFile(kubeCfg.StaticPodPath,nodeName,kubeCfg.FileCheckFrequency.Duration,cfg.Channel(kubetypes.FileSource))}
The code performs the following actions:
Creates a sourceFile and initializes UndeltaStore, fileKeyMapping, and watch channel.
Starts a goroutine called listConfig, which periodically reads the configurations of all pods, adds pods to the UndeltaStore, and records discovered files in fileKeyMapping. It also consumes events from the watch channel. If a file is added or modified, it re-reads the pod’s configuration file and adds the pod to the UndeltaStore. If a file is deleted, it removes the file from the fileKeyMapping records and deletes the corresponding pod from the UndeltaStore.
Starts another goroutine called startWatch to watch for file changes, generate file change events, and send them to the watch channel.
// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
funcNewSourceFile(pathstring,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{}){// "github.com/sigma/go-inotify" requires a path without trailing "/"
path=strings.TrimRight(path,string(os.PathSeparator))config:=newSourceFile(path,nodeName,period,updates)klog.V(1).Infof("Watching path %q",path)config.run()}funcnewSourceFile(pathstring,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{})*sourceFile{send:=func(objs[]interface{}){varpods[]*v1.Podfor_,o:=rangeobjs{pods=append(pods,o.(*v1.Pod))}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.FileSource}}store:=cache.NewUndeltaStore(send,cache.MetaNamespaceKeyFunc)return&sourceFile{path:path,nodeName:nodeName,period:period,store:store,fileKeyMapping:map[string]string{},updates:updates,watchEvents:make(chan*watchEvent,eventBufferLen),}}func(s*sourceFile)run(){listTicker:=time.NewTicker(s.period)gofunc(){// Read path immediately to speed up startup.
iferr:=s.listConfig();err!=nil{klog.Errorf("Unable to read config path %q: %v",s.path,err)}for{select{case<-listTicker.C:iferr:=s.listConfig();err!=nil{klog.Errorf("Unable to read config path %q: %v",s.path,err)}casee:=<-s.watchEvents:iferr:=s.consumeWatchEvent(e);err!=nil{klog.Errorf("Unable to process watch event: %v",err)}}}}()// 启动goroutine,watch文件变化,生产watchEvents
s.startWatch()}
3.1 read the pod config file extractFromFile
The extractFromFile function is responsible for reading pod configuration files and converting the file contents into a pod object. It also applies default values and modifications to the pod. It’s worth noting that each file is expected to contain the configuration for a single pod.
Here are some of the modifications and default values applied by extractFromFile:
Generating a unique UID for the pod.
Setting the pod’s name.
Converting an empty namespace to “default” if it’s not specified.
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
funcapplyDefaults(pod*api.Pod,sourcestring,isFilebool,nodeNametypes.NodeName)error{iflen(pod.UID)==0{hasher:=md5.New()ifisFile{fmt.Fprintf(hasher,"host:%s",nodeName)fmt.Fprintf(hasher,"file:%s",source)}else{fmt.Fprintf(hasher,"url:%s",source)}hash.DeepHashObject(hasher,pod)pod.UID=types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))klog.V(5).Infof("Generated UID %q pod %q from %s",pod.UID,pod.Name,source)}pod.Name=generatePodName(pod.Name,nodeName)klog.V(5).Infof("Generated Name %q for UID %q from URL %s",pod.Name,pod.UID,source)ifpod.Namespace==""{pod.Namespace=metav1.NamespaceDefault}klog.V(5).Infof("Using namespace %q for pod %q from %s",pod.Namespace,pod.Name,source)// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName=string(nodeName)pod.ObjectMeta.SelfLink=getSelfLink(pod.Name,pod.Namespace)ifpod.Annotations==nil{pod.Annotations=make(map[string]string)}// The generated UID is the hash of the file.
pod.Annotations[kubetypes.ConfigHashAnnotationKey]=string(pod.UID)ifisFile{// Applying the default Taint tolerations to static pods,
// so they are not evicted when there are node problems.
// 容忍所有NoExecute taint
helper.AddOrUpdateTolerationInPod(pod,&api.Toleration{Operator:"Exists",Effect:api.TaintEffectNoExecute,})}// Set the default status to pending.
pod.Status.Phase=api.PodPendingreturnnil}
3.2 startWatch
The startWatch function is responsible for launching a goroutine to monitor file changes and generate watch events. If there is an error while monitoring events, it employs exponential backoff and retries to recover from the error.
Here is a high-level overview of what startWatch does:
It starts a goroutine to continuously watch for file changes.
If an error occurs while monitoring events, it employs exponential backoff and retries to handle the error gracefully.
When a file change is detected, it generates a watch event and sends it to the watch channel for further processing.
This mechanism ensures that the kubelet can detect changes in pod configuration files and respond to them in a resilient manner, even in the presence of errors or temporary issues.
The HTTP source is responsible for launching a goroutine that periodically sends HTTP requests to a specified URL in order to retrieve pod or pod list information. It then generates a PodUpdate message with the operation (Op) set to SET, which includes all the pods obtained from the HTTP request, and sends this message to the source’s receiving channel.
Here is a breakdown of how the HTTP source works:
It starts a goroutine to periodically send HTTP requests to a specified URL.
The HTTP requests are made at regular intervals to fetch pod information or a list of pods.
Once the HTTP response is received, it extracts the pod information.
It generates a PodUpdate message with the operation set to SET, which includes all the pods obtained from the HTTP response.
This PodUpdate message is sent to the source’s receiving channel for further processing.
The HTTP source allows the kubelet to retrieve pod information from an external source via HTTP requests and keep the pod configurations up to date based on the information obtained from the URL.
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
funcNewSourceURL(urlstring,headerhttp.Header,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{}){config:=&sourceURL{url:url,header:header,nodeName:nodeName,updates:updates,data:nil,// Timing out requests leads to retries. This client is only used to
// read the manifest URL passed to kubelet.
client:&http.Client{Timeout:10*time.Second},}klog.V(1).Infof("Watching URL %s",url)gowait.Until(config.run,period,wait.NeverStop)}func(s*sourceURL)run(){iferr:=s.extractFromURL();err!=nil{// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
ifs.failureLogs<3{klog.Warningf("Failed to read pods from URL: %v",err)}elseifs.failureLogs==3{klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v",err)}else{klog.V(4).Infof("Failed to read pods from URL: %v",err)}s.failureLogs++}else{ifs.failureLogs>0{klog.Info("Successfully read pods from URL.")s.failureLogs=0}}}
4.1 extractFromURL
The extractFromURL function is responsible for making an HTTP request to a specified URL to retrieve pod or pod list configurations. Similar to the file source, it also applies modifications to the pods read from the URL and sends a PodUpdate message with the operation (Op) set to SET to the source’s receiving channel.
Here’s an overview of what extractFromURL does:
It makes an HTTP request to the specified URL to retrieve pod or pod list configurations.
Once the HTTP response is received, it extracts the pod configurations.
It applies modifications to the pods read from the URL, which may include generating a unique UID for each pod, setting pod names, converting empty namespaces to “default,” and making other necessary adjustments.
Finally, it sends a PodUpdate message with the operation set to SET, including all the modified pods, to the source’s receiving channel.
This process ensures that pod configurations retrieved from the URL are properly processed and sent to the kubelet for further handling.
func(s*sourceURL)extractFromURL()error{req,err:=http.NewRequest("GET",s.url,nil)iferr!=nil{returnerr}req.Header=s.headerresp,err:=s.client.Do(req)iferr!=nil{returnerr}deferresp.Body.Close()data,err:=utilio.ReadAtMost(resp.Body,maxConfigLength)iferr!=nil{returnerr}ifresp.StatusCode!=http.StatusOK{returnfmt.Errorf("%v: %v",s.url,resp.Status)}iflen(data)==0{// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates<-kubetypes.PodUpdate{Pods:[]*v1.Pod{},Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnfmt.Errorf("zero-length data received from %v",s.url)}// Short circuit if the data has not changed since the last time it was read.
ifbytes.Equal(data,s.data){returnnil}s.data=data// First try as it is a single pod.
parsed,pod,singlePodErr:=tryDecodeSinglePod(data,s.applyDefaults)ifparsed{ifsinglePodErr!=nil{// It parsed but could not be used.
returnsinglePodErr}s.updates<-kubetypes.PodUpdate{Pods:[]*v1.Pod{pod},Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnnil}// That didn't work, so try a list of pods.
parsed,podList,multiPodErr:=tryDecodePodList(data,s.applyDefaults)ifparsed{ifmultiPodErr!=nil{// It parsed but could not be used.
returnmultiPodErr}pods:=make([]*v1.Pod,0)fori:=rangepodList.Items{pods=append(pods,&podList.Items[i])}s.updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnnil}returnfmt.Errorf("%v: received '%v', but couldn't parse as "+"single (%v) or multiple pods (%v)",s.url,string(data),singlePodErr,multiPodErr)}
5 Restore from bootstrapCheckpointPath
Restoring pod information from the bootstrapCheckpointPath is a process that belongs to the API source. The bootstrapCheckpointPath is typically provided as a command-line argument (--bootstrap-checkpoint-path) when starting the kubelet.
Here’s what happens during the restoration process:
The kubelet checks if the bootstrapCheckpointPath is provided.
If the bootstrapCheckpointPath exists and is configured, it indicates that the kubelet was previously running and has saved some pod information.
The kubelet reads the saved pod information from the bootstrapCheckpointPath.
It then proceeds to restore the pod information, which may include details about running pods, from the saved data.
The restored pod information is processed and integrated into the kubelet’s current state, allowing it to continue managing the pods based on the restored data.
The purpose of restoring pod information from the bootstrapCheckpointPath is to ensure that the kubelet can recover its previous state and continue managing pods from where it left off in case of unexpected interruptions or restarts. This is particularly important for maintaining pod consistency and ensuring that no data is lost during kubelet restarts.
The Restore function in the PodConfig is responsible for restoring pods from the specified checkpoint path. It does this by reading all the pod checkpoint files from the directory and sending a PodUpdate message with the operation (Op) set to RESTORE to the source’s receiving channel.
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Restore restores pods from the checkpoint path, *once*
func(c*PodConfig)Restore(pathstring,updateschan<-interface{})error{ifc.checkpointManager!=nil{returnnil}varerrerrorc.checkpointManager,err=checkpointmanager.NewCheckpointManager(path)iferr!=nil{returnerr}pods,err:=checkpoint.LoadPods(c.checkpointManager)iferr!=nil{returnerr}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.RESTORE,Source:kubetypes.ApiserverSource}returnnil}
typeDatastruct{Pod*v1.PodChecksumchecksum.Checksum}// NewPodCheckpoint returns new pod checkpoint
funcNewPodCheckpoint(pod*v1.Pod)PodCheckpoint{return&Data{Pod:pod}}// LoadPods Loads All Checkpoints from disk
funcLoadPods(cpmcheckpointmanager.CheckpointManager)([]*v1.Pod,error){pods:=make([]*v1.Pod,0)// 获得目录下的所有文件列表
checkpointKeys,err:=cpm.ListCheckpoints()iferr!=nil{klog.Errorf("Failed to list checkpoints: %v",err)}for_,key:=rangecheckpointKeys{checkpoint:=NewPodCheckpoint(nil)// 将文件的内容转化成checkpoint--包含pod和checksum
err:=cpm.GetCheckpoint(key,checkpoint)iferr!=nil{klog.Errorf("Failed to retrieve checkpoint for pod %q: %v",key,err)continue}pods=append(pods,checkpoint.GetPod())}returnpods,nil}
The GetCheckpoint function in the CheckpointManager is responsible for retrieving a checkpoint from the CheckpointStore, reading the file’s content, converting it into a Checkpoint type, and then verifying the checksum.
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func(manager*impl)GetCheckpoint(checkpointKeystring,checkpointCheckpoint)error{manager.mutex.Lock()defermanager.mutex.Unlock()//读取文件
blob,err:=manager.store.Read(checkpointKey)iferr!=nil{iferr==utilstore.ErrKeyNotFound{returnerrors.ErrCheckpointNotFound}returnerr}err=checkpoint.UnmarshalCheckpoint(blob)iferr==nil{err=checkpoint.VerifyChecksum()}returnerr}
pkg\kubelet\checkpoint\checkpoint.go
1
2
3
4
5
6
7
8
9
// UnmarshalCheckpoint returns unmarshalled data
func(cp*Data)UnmarshalCheckpoint(blob[]byte)error{returnjson.Unmarshal(blob,cp)}// VerifyChecksum verifies that passed checksum is same as calculated checksum
func(cp*Data)VerifyChecksum()error{returncp.Checksum.Verify(*cp.Pod)}
// Checksum is the data to be stored as checkpoint
typeChecksumuint64// Verify verifies that passed checksum is same as calculated checksum
func(csChecksum)Verify(datainterface{})error{ifcs!=New(data){returnerrors.ErrCorruptCheckpoint}returnnil}// New returns the Checksum of checkpoint data
funcNew(datainterface{})Checksum{returnChecksum(getChecksum(data))}// Get returns calculated checksum of checkpoint data
funcgetChecksum(datainterface{})uint64{hash:=fnv.New32a()hashutil.DeepHashObject(hash,data)returnuint64(hash.Sum32())}
6 api source
The API source in the kubelet is responsible for retrieving pod information from the Kubernetes API server for a specific node. It uses a mechanism involving the Reflector and UndeltaStore to watch for changes in pod resources on the API server and send PodUpdate messages to the source’s receiving channel.
Here’s how the process works:
The Reflector initially performs a list operation on the API server to fetch all pods that are scheduled to run on the node. It filters pods based on the spec.nodeName field to include only those pods that should run on the specific node. This list of pods is obtained through an API call to the Kubernetes API server.
After obtaining the list of pods from the API server, the Reflector invokes the UndeltaStore.Replace method. This method replaces the stored pod list in the UndeltaStore with the newly fetched list of pods. This effectively refreshes the internal representation of pods for this node in the kubelet.
Subsequently, the Reflector sets up a watch on pod resources on the API server. This watch is filtered based on the spec.nodeName, ensuring that only events related to pods scheduled on the specific node are received.
When the Reflector receives watch events from the API server, it processes these events based on their type:
If the event type is Added, it means a new pod has been scheduled to run on the node. The Reflector invokes the UndeltaStore.Add method to add the pod to the stored pod list in the UndeltaStore. After adding the pod, it executes the PushFunc.
If the event type is Modified, it means an existing pod’s resource has been updated. The Reflector invokes the UndeltaStore.Update method to update the corresponding pod in the stored pod list in the UndeltaStore. After updating the pod, it executes the PushFunc.
If the event type is Deleted, it means a pod has been deleted. The Reflector invokes the UndeltaStore.Delete method to remove the pod from the stored pod list in the UndeltaStore. After deleting the pod, it executes the PushFunc.
For each of these events (Added, Modified, Deleted), the Reflector triggers the PushFunc to send PodUpdate messages to the source’s receiving channel. These PodUpdate messages include the updated list of pods and an operation (Op) indicating the type of change (ADD, UPDATE, DELETE, or RESTORE).
This mechanism allows the kubelet to dynamically reflect changes in pod resources scheduled on the node, ensuring that its internal representation of pods is always up-to-date with the API server. It helps the kubelet adapt to changes in pod assignments and respond to pod additions, modifications, and deletions.
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
funcNewSourceApiserver(cclientset.Interface,nodeNametypes.NodeName,updateschan<-interface{}){lw:=cache.NewListWatchFromClient(c.CoreV1().RESTClient(),"pods",metav1.NamespaceAll,fields.OneTermEqualSelector(api.PodHostField,string(nodeName)))newSourceApiserverFromLW(lw,updates)}// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
funcnewSourceApiserverFromLW(lwcache.ListerWatcher,updateschan<-interface{}){send:=func(objs[]interface{}){varpods[]*v1.Podfor_,o:=rangeobjs{pods=append(pods,o.(*v1.Pod))}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.ApiserverSource}}r:=cache.NewReflector(lw,&v1.Pod{},cache.NewUndeltaStore(send,cache.MetaNamespaceKeyFunc),0)gor.Run(wait.NeverStop)}