// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
typePodConfigstruct{pods*podStorage// 聚合器
mux*config.Mux// the channel of denormalized changes passed to listeners
updateschankubetypes.PodUpdate// contains the list of all configured sources
sourcesLocksync.Mutexsourcessets.StringcheckpointManagercheckpointmanager.CheckpointManager}// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
typepodStoragestruct{podLocksync.RWMutex// map of source name to pod uid to pod reference
podsmap[string]map[types.UID]*v1.PodmodePodConfigNotificationMode// ensures that updates are delivered in strict order
// on the updates channel
updateLocksync.Mutexupdateschan<-kubetypes.PodUpdate// contains the set of all sources that have sent at least one SET
sourcesSeenLocksync.RWMutexsourcesSeensets.String// the EventRecorder to use
recorderrecord.EventRecorder}
podUpdate定义
pkg\kubelet\types\pod_update.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
typePodUpdatestruct{Pods[]*v1.PodOpPodOperationSourcestring}
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
funcmakePodSourceConfig(kubeCfg*kubeletconfiginternal.KubeletConfiguration,kubeDeps*Dependencies,nodeNametypes.NodeName,bootstrapCheckpointPathstring)(*config.PodConfig,error){manifestURLHeader:=make(http.Header)iflen(kubeCfg.StaticPodURLHeader)>0{fork,v:=rangekubeCfg.StaticPodURLHeader{fori:=rangev{manifestURLHeader.Add(k,v[i])}}}// source of all configuration
cfg:=config.NewPodConfig(config.PodConfigNotificationIncremental,kubeDeps.Recorder)
1.1 创建一个podconfig
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
funcNewPodConfig(modePodConfigNotificationMode,recorderrecord.EventRecorder)*PodConfig{updates:=make(chankubetypes.PodUpdate,50)storage:=newPodStorage(updates,mode,recorder)podConfig:=&PodConfig{pods:storage,mux:config.NewMux(storage),updates:updates,sources:sets.String{},}returnpodConfig}
podStorage保存某个时间点所有源里的pod,将pod的相关变化发送给update通道
pkg\kubelet\config\config.go
1
2
3
4
5
6
7
8
9
10
11
12
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
funcnewPodStorage(updateschan<-kubetypes.PodUpdate,modePodConfigNotificationMode,recorderrecord.EventRecorder)*podStorage{return&podStorage{pods:make(map[string]map[types.UID]*v1.Pod),mode:mode,updates:updates,sourcesSeen:sets.String{},recorder:recorder,}}
typeMergerinterface{// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(sourcestring,updateinterface{})error}// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
typeMuxstruct{// Invoked when an update is sent to a source.
mergerMerger// Sources and their lock.
sourceLocksync.RWMutex// Maps source names to channels
sourcesmap[string]chaninterface{}}// NewMux creates a new mux that can merge changes from multiple sources.
funcNewMux(mergerMerger)*Mux{mux:=&Mux{sources:make(map[string]chaninterface{}),merger:merger,}returnmux}
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
//
// source已经有chan通道,就返回这个chan
// 否则创建一个source的chan,然后启动一个goroutine消费这个chan(对消息进行加工分类,然后将最终的podUpdate消息发送给merger--PodStorage里的updates--chan通道)
func(m*Mux)Channel(sourcestring)chaninterface{}{iflen(source)==0{panic("Channel given an empty name")}m.sourceLock.Lock()deferm.sourceLock.Unlock()channel,exists:=m.sources[source]ifexists{returnchannel}newChannel:=make(chaninterface{})m.sources[source]=newChannelgowait.Until(func(){m.listen(source,newChannel)},0,wait.NeverStop)returnnewChannel}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
//
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func(s*podStorage)Merge(sourcestring,changeinterface{})error{s.updateLock.Lock()defers.updateLock.Unlock()seenBefore:=s.sourcesSeen.Has(source)adds,updates,deletes,removes,reconciles,restores:=s.merge(source,change)......
// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
funcNewSourceFile(pathstring,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{}){// "github.com/sigma/go-inotify" requires a path without trailing "/"
path=strings.TrimRight(path,string(os.PathSeparator))config:=newSourceFile(path,nodeName,period,updates)klog.V(1).Infof("Watching path %q",path)config.run()}funcnewSourceFile(pathstring,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{})*sourceFile{send:=func(objs[]interface{}){varpods[]*v1.Podfor_,o:=rangeobjs{pods=append(pods,o.(*v1.Pod))}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.FileSource}}store:=cache.NewUndeltaStore(send,cache.MetaNamespaceKeyFunc)return&sourceFile{path:path,nodeName:nodeName,period:period,store:store,fileKeyMapping:map[string]string{},updates:updates,watchEvents:make(chan*watchEvent,eventBufferLen),}}func(s*sourceFile)run(){listTicker:=time.NewTicker(s.period)gofunc(){// Read path immediately to speed up startup.
iferr:=s.listConfig();err!=nil{klog.Errorf("Unable to read config path %q: %v",s.path,err)}for{select{case<-listTicker.C:iferr:=s.listConfig();err!=nil{klog.Errorf("Unable to read config path %q: %v",s.path,err)}casee:=<-s.watchEvents:iferr:=s.consumeWatchEvent(e);err!=nil{klog.Errorf("Unable to process watch event: %v",err)}}}}()// 启动goroutine,watch文件变化,生产watchEvents
s.startWatch()}
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
funcapplyDefaults(pod*api.Pod,sourcestring,isFilebool,nodeNametypes.NodeName)error{iflen(pod.UID)==0{hasher:=md5.New()ifisFile{fmt.Fprintf(hasher,"host:%s",nodeName)fmt.Fprintf(hasher,"file:%s",source)}else{fmt.Fprintf(hasher,"url:%s",source)}hash.DeepHashObject(hasher,pod)pod.UID=types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))klog.V(5).Infof("Generated UID %q pod %q from %s",pod.UID,pod.Name,source)}pod.Name=generatePodName(pod.Name,nodeName)klog.V(5).Infof("Generated Name %q for UID %q from URL %s",pod.Name,pod.UID,source)ifpod.Namespace==""{pod.Namespace=metav1.NamespaceDefault}klog.V(5).Infof("Using namespace %q for pod %q from %s",pod.Namespace,pod.Name,source)// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName=string(nodeName)pod.ObjectMeta.SelfLink=getSelfLink(pod.Name,pod.Namespace)ifpod.Annotations==nil{pod.Annotations=make(map[string]string)}// The generated UID is the hash of the file.
pod.Annotations[kubetypes.ConfigHashAnnotationKey]=string(pod.UID)ifisFile{// Applying the default Taint tolerations to static pods,
// so they are not evicted when there are node problems.
// 容忍所有NoExecute taint
helper.AddOrUpdateTolerationInPod(pod,&api.Toleration{Operator:"Exists",Effect:api.TaintEffectNoExecute,})}// Set the default status to pending.
pod.Status.Phase=api.PodPendingreturnnil}
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
funcNewSourceURL(urlstring,headerhttp.Header,nodeNametypes.NodeName,periodtime.Duration,updateschan<-interface{}){config:=&sourceURL{url:url,header:header,nodeName:nodeName,updates:updates,data:nil,// Timing out requests leads to retries. This client is only used to
// read the manifest URL passed to kubelet.
client:&http.Client{Timeout:10*time.Second},}klog.V(1).Infof("Watching URL %s",url)gowait.Until(config.run,period,wait.NeverStop)}func(s*sourceURL)run(){iferr:=s.extractFromURL();err!=nil{// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
ifs.failureLogs<3{klog.Warningf("Failed to read pods from URL: %v",err)}elseifs.failureLogs==3{klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v",err)}else{klog.V(4).Infof("Failed to read pods from URL: %v",err)}s.failureLogs++}else{ifs.failureLogs>0{klog.Info("Successfully read pods from URL.")s.failureLogs=0}}}
func(s*sourceURL)extractFromURL()error{req,err:=http.NewRequest("GET",s.url,nil)iferr!=nil{returnerr}req.Header=s.headerresp,err:=s.client.Do(req)iferr!=nil{returnerr}deferresp.Body.Close()data,err:=utilio.ReadAtMost(resp.Body,maxConfigLength)iferr!=nil{returnerr}ifresp.StatusCode!=http.StatusOK{returnfmt.Errorf("%v: %v",s.url,resp.Status)}iflen(data)==0{// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates<-kubetypes.PodUpdate{Pods:[]*v1.Pod{},Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnfmt.Errorf("zero-length data received from %v",s.url)}// Short circuit if the data has not changed since the last time it was read.
ifbytes.Equal(data,s.data){returnnil}s.data=data// First try as it is a single pod.
parsed,pod,singlePodErr:=tryDecodeSinglePod(data,s.applyDefaults)ifparsed{ifsinglePodErr!=nil{// It parsed but could not be used.
returnsinglePodErr}s.updates<-kubetypes.PodUpdate{Pods:[]*v1.Pod{pod},Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnnil}// That didn't work, so try a list of pods.
parsed,podList,multiPodErr:=tryDecodePodList(data,s.applyDefaults)ifparsed{ifmultiPodErr!=nil{// It parsed but could not be used.
returnmultiPodErr}pods:=make([]*v1.Pod,0)fori:=rangepodList.Items{pods=append(pods,&podList.Items[i])}s.updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.HTTPSource}returnnil}returnfmt.Errorf("%v: received '%v', but couldn't parse as "+"single (%v) or multiple pods (%v)",s.url,string(data),singlePodErr,multiPodErr)}
// Restore restores pods from the checkpoint path, *once*
func(c*PodConfig)Restore(pathstring,updateschan<-interface{})error{ifc.checkpointManager!=nil{returnnil}varerrerrorc.checkpointManager,err=checkpointmanager.NewCheckpointManager(path)iferr!=nil{returnerr}pods,err:=checkpoint.LoadPods(c.checkpointManager)iferr!=nil{returnerr}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.RESTORE,Source:kubetypes.ApiserverSource}returnnil}
typeDatastruct{Pod*v1.PodChecksumchecksum.Checksum}// NewPodCheckpoint returns new pod checkpoint
funcNewPodCheckpoint(pod*v1.Pod)PodCheckpoint{return&Data{Pod:pod}}// LoadPods Loads All Checkpoints from disk
funcLoadPods(cpmcheckpointmanager.CheckpointManager)([]*v1.Pod,error){pods:=make([]*v1.Pod,0)// 获得目录下的所有文件列表
checkpointKeys,err:=cpm.ListCheckpoints()iferr!=nil{klog.Errorf("Failed to list checkpoints: %v",err)}for_,key:=rangecheckpointKeys{checkpoint:=NewPodCheckpoint(nil)// 将文件的内容转化成checkpoint--包含pod和checksum
err:=cpm.GetCheckpoint(key,checkpoint)iferr!=nil{klog.Errorf("Failed to retrieve checkpoint for pod %q: %v",key,err)continue}pods=append(pods,checkpoint.GetPod())}returnpods,nil}
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func(manager*impl)GetCheckpoint(checkpointKeystring,checkpointCheckpoint)error{manager.mutex.Lock()defermanager.mutex.Unlock()//读取文件
blob,err:=manager.store.Read(checkpointKey)iferr!=nil{iferr==utilstore.ErrKeyNotFound{returnerrors.ErrCheckpointNotFound}returnerr}err=checkpoint.UnmarshalCheckpoint(blob)iferr==nil{err=checkpoint.VerifyChecksum()}returnerr}
pkg\kubelet\checkpoint\checkpoint.go
1
2
3
4
5
6
7
8
9
// UnmarshalCheckpoint returns unmarshalled data
func(cp*Data)UnmarshalCheckpoint(blob[]byte)error{returnjson.Unmarshal(blob,cp)}// VerifyChecksum verifies that passed checksum is same as calculated checksum
func(cp*Data)VerifyChecksum()error{returncp.Checksum.Verify(*cp.Pod)}
// Checksum is the data to be stored as checkpoint
typeChecksumuint64// Verify verifies that passed checksum is same as calculated checksum
func(csChecksum)Verify(datainterface{})error{ifcs!=New(data){returnerrors.ErrCorruptCheckpoint}returnnil}// New returns the Checksum of checkpoint data
funcNew(datainterface{})Checksum{returnChecksum(getChecksum(data))}// Get returns calculated checksum of checkpoint data
funcgetChecksum(datainterface{})uint64{hash:=fnv.New32a()hashutil.DeepHashObject(hash,data)returnuint64(hash.Sum32())}
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
funcNewSourceApiserver(cclientset.Interface,nodeNametypes.NodeName,updateschan<-interface{}){lw:=cache.NewListWatchFromClient(c.CoreV1().RESTClient(),"pods",metav1.NamespaceAll,fields.OneTermEqualSelector(api.PodHostField,string(nodeName)))newSourceApiserverFromLW(lw,updates)}// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
funcnewSourceApiserverFromLW(lwcache.ListerWatcher,updateschan<-interface{}){send:=func(objs[]interface{}){varpods[]*v1.Podfor_,o:=rangeobjs{pods=append(pods,o.(*v1.Pod))}updates<-kubetypes.PodUpdate{Pods:pods,Op:kubetypes.SET,Source:kubetypes.ApiserverSource}}r:=cache.NewReflector(lw,&v1.Pod{},cache.NewUndeltaStore(send,cache.MetaNamespaceKeyFunc),0)gor.Run(wait.NeverStop)}