kubelet podConfig - Providing Pods Running on kubelet

The podConfig within the kubelet is used to aggregate information from multiple sources related to pods. It processes and aggregates information from various sources of pods (deduplication, analysis of changes in pods). If there are changes in pods from these sources, podConfig updates and aggregates the changes, generating various types of PodUpdate messages and sending them to the PodUpdate channel.

This article is based on Kubernetes version 1.18.6. Please visit the source code reading repository.

There are three types of pod sources:

  • file: Reads pod information from files, which are static pods.
  • api: Retrieves all pods on this node from the API server.
  • http: Retrieves pod or pod list information via HTTP, which are static pods.

PodConfig Overview

podConfig

podConfig includes podStorage, mux, and channels for receiving podUpdate messages.

The podUpdate contains a list of pods, the operation type, and the source’s name.

The podStorage keeps all the pods from all sources at a given moment.

The mux is responsible for aggregating pods from all sources, analyzing changes in all pods, and composing various podUpdate messages, which are then sent to the channel for receiving podUpdate messages.

Within podConfig, the aggregator consumes PodUpdate messages through the source channel. For each source, an associated goroutine is started to consume messages from the source channel. These goroutines analyze the messages and generate various PodUpdate messages, which are ultimately sent to the updates channel.

pkg\kubelet\config\config.go

go

// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
	pods *podStorage
	// 聚合器
	mux  *config.Mux

	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate

	// contains the list of all configured sources
	sourcesLock       sync.Mutex
	sources           sets.String
	checkpointManager checkpointmanager.CheckpointManager
}

// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order.  Note that this object is an in-memory source of
// "truth" and on creation contains zero entries.  Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
	podLock sync.RWMutex
	// map of source name to pod uid to pod reference
	pods map[string]map[types.UID]*v1.Pod
	mode PodConfigNotificationMode

	// ensures that updates are delivered in strict order
	// on the updates channel
	updateLock sync.Mutex
	updates    chan<- kubetypes.PodUpdate

	// contains the set of all sources that have sent at least one SET
	sourcesSeenLock sync.RWMutex
	sourcesSeen     sets.String

	// the EventRecorder to use
	recorder record.EventRecorder
}

podUpdate definition

pkg\kubelet\types\pod_update.go

go

// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string
}

The initialization of PodConfig in kubelet can be found in pkg\kubelet\kubelet.go:

go

	if kubeDeps.PodConfig == nil {
		var err error
		// bootstrapCheckpointPath默认为空
		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
		if err != nil {
			return nil, err
		}
	}

In pkg\kubelet\kubelet.go, makePodSourceConfig is called to create PodConfig.

go

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

pkg\kubelet\config\config.go

go

// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
	updates := make(chan kubetypes.PodUpdate, 50)
	storage := newPodStorage(updates, mode, recorder)
	podConfig := &PodConfig{
		pods:    storage,
		mux:     config.NewMux(storage),
		updates: updates,
		sources: sets.String{},
	}
	return podConfig
}

podStorage stores all pods from different sources at a specific time and sends changes related to pods to the updates channel.

pkg\kubelet\config\config.go

go

// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
	return &podStorage{
		pods:        make(map[string]map[types.UID]*v1.Pod),
		mode:        mode,
		updates:     updates,
		sourcesSeen: sets.String{},
		recorder:    recorder,
	}
}

The mux (multiplexer) is responsible for aggregating pods from various sources and analyzing changes.

pkg\util\config\config.go

go

type Merger interface {
	// Invoked when a change from a source is received.  May also function as an incremental
	// merger if you wish to consume changes incrementally.  Must be reentrant when more than
	// one source is defined.
	Merge(source string, update interface{}) error
}

// Mux is a class for merging configuration from multiple sources.  Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
	// Invoked when an update is sent to a source.
	merger Merger

	// Sources and their lock.
	sourceLock sync.RWMutex
	// Maps source names to channels
	sources map[string]chan interface{}
}

// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
	mux := &Mux{
		sources: make(map[string]chan interface{}),
		merger:  merger,
	}
	return mux
}

To add a source to podConfig, you can call the Channel function, which returns a channel specific to that source. This channel is used to receive PodUpdate messages generated by the source, and it is referred to as the “source channel.”

pkg\kubelet\config\config.go

go

func (c *PodConfig) Channel(source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	return c.mux.Channel(source)
}

The Channel function of the multiplexer (mux) is invoked. If a channel for the source already exists, it returns that channel. Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel. The goroutine processes messages (categorizing them) and sends the final PodUpdate messages to the merger (which is the PodStorage’s updates channel).

pkg\util\config\config.go

go

// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
// 
// If a channel for the source already exists, it returns that channel.
// Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel.
// The goroutine processes messages (categorizing them) and sends the final PodUpdate messages to the merger (which is the PodStorage's updates channel).
func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}

The aggregator (goroutine) within the multiplexer consumes messages from this channel, processes them (categorizes them), and aggregates them. It then calls podStorage’s Merge function to perform aggregation.

pkg\util\config\config.go

go

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
        // The merger here is the podStorage
		m.merger.Merge(source, update)
	}
}

podStorage Merge

  • If the Op is ADD, UPDATE, or DELETE, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates the pods in podStorage.
  • If the Op is REMOVE, the pod list in the PodUpdate message represents removePods.
  • If the Op is SET, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in the PodUpdate message) replaces the pods in podStorage, and a list of removed pods is calculated.
  • If the Op is RESTORE, the pod list in the PodUpdate message represents restorePods.
  • Based on the analysis results, the corresponding PodUpdate messages with different Ops are generated.
  • Depending on the mode(kubelet mode is PodConfigNotificationIncremental), different PodUpdate messages are sent to the updates channel:
    • If the mode is PodConfigNotificationIncremental and there are changes in add, update, delete, remove, restore, or reconcile pods, corresponding ADD, UPDATE, DELETE, REMOVE, RESTORE, or RECONCILE PodUpdate events are sent. If a new source is added and there are no add, update, or delete pods, an empty ADD PodUpdate event is sent to mark the source as ready. This mode is used by kubelet.
    • If the mode is PodConfigNotificationSnapshotAndUpdates, SET PodUpdate events are sent when remove or add pods or a new source is added. UPDATE and DELETE PodUpdate events are sent when there are updates or deletes of pods.
    • If the mode is PodConfigNotificationSnapshot, SET PodUpdate events are sent when there are updates, deletes, adds, removes, or a new source is added.

The algorithm for processing PodUpdate messages is responsible for analyzing the operations and generating corresponding PodUpdate messages based on the changes in pods.

  • If the Op is ADD, UPDATE, or DELETE, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates the pods in podStorage.
  • If the Op is REMOVE, the pod list in the PodUpdate message represents removePods.
  • If the Op is SET, it compares the podStorage’s pods with the pod list in the PodUpdate message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in the PodUpdate message) replaces the pods in podStorage, and a list of removed pods is calculated.
  • If the Op is RESTORE, the pod list in the PodUpdate message represents restorePods.
  • Based on the analysis results, the corresponding Op-specific PodUpdate messages are generated.

podconfig merger

pkg\kubelet\config\config.go

go

// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
// 
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    ......

The provided information describes the process of reading pod configurations from files, storing them in the UndeltaStore, and listening for file changes. The UndeltaStore is responsible for generating PodUpdate messages with the operation (Op) set to SET, which includes all pods, and sending these PodUpdate messages to the file source’s receiving channel.

The UndeltaStore is a thread-safe storage that contains all pods from the source. It includes methods such as Add (adding pods to UndeltaStore), Update (updating pods in UndeltaStore), Replace (replacing pods in UndeltaStore), and Delete (deleting pods from UndeltaStore). Regardless of which method is executed, it will also execute PushFunc, which packages the pod list into a PodUpdate message with the operation set to SET and sends it to the source’s receiving channel. The UndeltaStore is defined in /staging/src/k8s.io/client-go/tools/cache/undelta_store.go.

The PushFunc is function send in newSourceFile pkg/kubelet/config/file.go

go

func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
	}
	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)

pkg\kubelet\kubelet.go

go

    // define file config source
	// 默认StaticPodPath为空,FileCheckFrequency为20s
	if kubeCfg.StaticPodPath != "" {
		klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
		// Start a goroutine to watch file changes and periodically read files, generating PodUpdate messages (with Op set to SET, including all pods) and sending them to the updates channel
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
	}

The code performs the following actions:

  1. Creates a sourceFile and initializes UndeltaStore, fileKeyMapping, and watch channel.
  2. Starts a goroutine called listConfig, which periodically reads the configurations of all pods, adds pods to the UndeltaStore, and records discovered files in fileKeyMapping. It also consumes events from the watch channel. If a file is added or modified, it re-reads the pod’s configuration file and adds the pod to the UndeltaStore. If a file is deleted, it removes the file from the fileKeyMapping records and deletes the corresponding pod from the UndeltaStore.
  3. Starts another goroutine called startWatch to watch for file changes, generate file change events, and send them to the watch channel.

podconfig handle file process

pkg\kubelet\config\file.go

go

// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
	// "github.com/sigma/go-inotify" requires a path without trailing "/"
	path = strings.TrimRight(path, string(os.PathSeparator))

	config := newSourceFile(path, nodeName, period, updates)
	klog.V(1).Infof("Watching path %q", path)
	config.run()
}

func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
	}
	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
	return &sourceFile{
		path:           path,
		nodeName:       nodeName,
		period:         period,
		store:          store,
		fileKeyMapping: map[string]string{},
		updates:        updates,
		watchEvents:    make(chan *watchEvent, eventBufferLen),
	}
}

func (s *sourceFile) run() {
	listTicker := time.NewTicker(s.period)

	go func() {
		// Read path immediately to speed up startup.
		if err := s.listConfig(); err != nil {
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
		}
		for {
			select {
			case <-listTicker.C:
				if err := s.listConfig(); err != nil {
					klog.Errorf("Unable to read config path %q: %v", s.path, err)
				}
			case e := <-s.watchEvents:
				if err := s.consumeWatchEvent(e); err != nil {
					klog.Errorf("Unable to process watch event: %v", err)
				}
			}
		}
	}()

	// 启动goroutine,watch文件变化,生产watchEvents
	s.startWatch()
}

The extractFromFile function is responsible for reading pod configuration files and converting the file contents into a pod object. It also applies default values and modifications to the pod. It’s worth noting that each file is expected to contain the configuration for a single pod.

Here are some of the modifications and default values applied by extractFromFile:

  1. Generating a unique UID for the pod.
  2. Setting the pod’s name.
  3. Converting an empty namespace to “default” if it’s not specified.
  4. Setting Spec.NodeName to a specific value.
  5. Setting ObjectMeta.SelfLink.
  6. Adding tolerations for all NoExecute taints.
  7. Setting Status.Phase to “Pending.”
  8. Adding annotations to the pod.

pkg\kubelet\config\file.go

go

// extractFromFile parses a file for Pod configuration information.
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
	klog.V(3).Infof("Reading config file %q", filename)
	defer func() {
		if err == nil && pod != nil {
			objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
			if keyErr != nil {
				err = keyErr
				return
			}
			s.fileKeyMapping[filename] = objKey
		}
	}()

	file, err := os.Open(filename)
	if err != nil {
		return pod, err
	}
	defer file.Close()

	data, err := utilio.ReadAtMost(file, maxConfigLength)
	if err != nil {
		return pod, err
	}

	defaultFn := func(pod *api.Pod) error {
		return s.applyDefaults(pod, filename)
	}

	// 解析字节流为pod
	// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
	parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
	if parsed {
		if podErr != nil {
			return pod, podErr
		}
		return pod, nil
	}

	return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}

pkg\kubelet\config\common.go

go

// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
	if len(pod.UID) == 0 {
		hasher := md5.New()
		if isFile {
			fmt.Fprintf(hasher, "host:%s", nodeName)
			fmt.Fprintf(hasher, "file:%s", source)
		} else {
			fmt.Fprintf(hasher, "url:%s", source)
		}
		hash.DeepHashObject(hasher, pod)
		pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))
		klog.V(5).Infof("Generated UID %q pod %q from %s", pod.UID, pod.Name, source)
	}

	pod.Name = generatePodName(pod.Name, nodeName)
	klog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)

	if pod.Namespace == "" {
		pod.Namespace = metav1.NamespaceDefault
	}
	klog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)

	// Set the Host field to indicate this pod is scheduled on the current node.
	pod.Spec.NodeName = string(nodeName)

	pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace)

	if pod.Annotations == nil {
		pod.Annotations = make(map[string]string)
	}
	// The generated UID is the hash of the file.
	pod.Annotations[kubetypes.ConfigHashAnnotationKey] = string(pod.UID)

	if isFile {
		// Applying the default Taint tolerations to static pods,
		// so they are not evicted when there are node problems.
		// 容忍所有NoExecute taint
		helper.AddOrUpdateTolerationInPod(pod, &api.Toleration{
			Operator: "Exists",
			Effect:   api.TaintEffectNoExecute,
		})
	}

	// Set the default status to pending.
	pod.Status.Phase = api.PodPending
	return nil
}

The startWatch function is responsible for launching a goroutine to monitor file changes and generate watch events. If there is an error while monitoring events, it employs exponential backoff and retries to recover from the error.

Here is a high-level overview of what startWatch does:

  1. It starts a goroutine to continuously watch for file changes.
  2. If an error occurs while monitoring events, it employs exponential backoff and retries to handle the error gracefully.
  3. When a file change is detected, it generates a watch event and sends it to the watch channel for further processing.

This mechanism ensures that the kubelet can detect changes in pod configuration files and respond to them in a resilient manner, even in the presence of errors or temporary issues.

pkg\kubelet\config\file_linux.go

go

func (s *sourceFile) startWatch() {
	backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
	backOffID := "watch"

	// 如果func()执行完成,等待一秒后重新执行
	// watch文件变化发生错误的时候,等待1秒,查询是否在回退周期内,不在回退周期内,重新执行watch
	go wait.Forever(func() {
		// 在回退周期中
		if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
			return
		}

		if err := s.doWatch(); err != nil {
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
			// 发生不是文件不存在的错误,进行回退
			if _, retryable := err.(*retryableError); !retryable {
				backOff.Next(backOffID, time.Now())
			}
		}
	}, retryPeriod)
}

The HTTP source is responsible for launching a goroutine that periodically sends HTTP requests to a specified URL in order to retrieve pod or pod list information. It then generates a PodUpdate message with the operation (Op) set to SET, which includes all the pods obtained from the HTTP request, and sends this message to the source’s receiving channel.

Here is a breakdown of how the HTTP source works:

  1. It starts a goroutine to periodically send HTTP requests to a specified URL.
  2. The HTTP requests are made at regular intervals to fetch pod information or a list of pods.
  3. Once the HTTP response is received, it extracts the pod information.
  4. It generates a PodUpdate message with the operation set to SET, which includes all the pods obtained from the HTTP response.
  5. This PodUpdate message is sent to the source’s receiving channel for further processing.

The HTTP source allows the kubelet to retrieve pod information from an external source via HTTP requests and keep the pod configurations up to date based on the information obtained from the URL.

/pkg/kubelet/kubelet.go

go

	// define url config source
	// 默认StaticPodURL为空,HTTPCheckFrequency默认为20s
	if kubeCfg.StaticPodURL != "" {
		klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
		// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
	}

pkg\kubelet\config\http.go

go

// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
	config := &sourceURL{
		url:      url,
		header:   header,
		nodeName: nodeName,
		updates:  updates,
		data:     nil,
		// Timing out requests leads to retries. This client is only used to
		// read the manifest URL passed to kubelet.
		client: &http.Client{Timeout: 10 * time.Second},
	}
	klog.V(1).Infof("Watching URL %s", url)
	go wait.Until(config.run, period, wait.NeverStop)
}

func (s *sourceURL) run() {
	if err := s.extractFromURL(); err != nil {
		// Don't log this multiple times per minute. The first few entries should be
		// enough to get the point across.
		if s.failureLogs < 3 {
			klog.Warningf("Failed to read pods from URL: %v", err)
		} else if s.failureLogs == 3 {
			klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
		} else {
			klog.V(4).Infof("Failed to read pods from URL: %v", err)
		}
		s.failureLogs++
	} else {
		if s.failureLogs > 0 {
			klog.Info("Successfully read pods from URL.")
			s.failureLogs = 0
		}
	}
}

The extractFromURL function is responsible for making an HTTP request to a specified URL to retrieve pod or pod list configurations. Similar to the file source, it also applies modifications to the pods read from the URL and sends a PodUpdate message with the operation (Op) set to SET to the source’s receiving channel.

Here’s an overview of what extractFromURL does:

  1. It makes an HTTP request to the specified URL to retrieve pod or pod list configurations.
  2. Once the HTTP response is received, it extracts the pod configurations.
  3. It applies modifications to the pods read from the URL, which may include generating a unique UID for each pod, setting pod names, converting empty namespaces to “default,” and making other necessary adjustments.
  4. Finally, it sends a PodUpdate message with the operation set to SET, including all the modified pods, to the source’s receiving channel.

This process ensures that pod configurations retrieved from the URL are properly processed and sent to the kubelet for further handling.

pkg\kubelet\config\http.go

go

func (s *sourceURL) extractFromURL() error {
	req, err := http.NewRequest("GET", s.url, nil)
	if err != nil {
		return err
	}
	req.Header = s.header
	resp, err := s.client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
	if err != nil {
		return err
	}
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("%v: %v", s.url, resp.Status)
	}
	if len(data) == 0 {
		// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return fmt.Errorf("zero-length data received from %v", s.url)
	}
	// Short circuit if the data has not changed since the last time it was read.
	if bytes.Equal(data, s.data) {
		return nil
	}
	s.data = data

	// First try as it is a single pod.
	parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
	if parsed {
		if singlePodErr != nil {
			// It parsed but could not be used.
			return singlePodErr
		}
		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return nil
	}

	// That didn't work, so try a list of pods.
	parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
	if parsed {
		if multiPodErr != nil {
			// It parsed but could not be used.
			return multiPodErr
		}
		pods := make([]*v1.Pod, 0)
		for i := range podList.Items {
			pods = append(pods, &podList.Items[i])
		}
		s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return nil
	}

	return fmt.Errorf("%v: received '%v', but couldn't parse as "+
		"single (%v) or multiple pods (%v)",
		s.url, string(data), singlePodErr, multiPodErr)
}

Restoring pod information from the bootstrapCheckpointPath is a process that belongs to the API source. The bootstrapCheckpointPath is typically provided as a command-line argument (--bootstrap-checkpoint-path) when starting the kubelet.

Here’s what happens during the restoration process:

  1. The kubelet checks if the bootstrapCheckpointPath is provided.
  2. If the bootstrapCheckpointPath exists and is configured, it indicates that the kubelet was previously running and has saved some pod information.
  3. The kubelet reads the saved pod information from the bootstrapCheckpointPath.
  4. It then proceeds to restore the pod information, which may include details about running pods, from the saved data.
  5. The restored pod information is processed and integrated into the kubelet’s current state, allowing it to continue managing the pods based on the restored data.

The purpose of restoring pod information from the bootstrapCheckpointPath is to ensure that the kubelet can recover its previous state and continue managing pods from where it left off in case of unexpected interruptions or restarts. This is particularly important for maintaining pod consistency and ensuring that no data is lost during kubelet restarts.

pkg/kubelet/kubelet.go

go

	var updatechannel chan<- interface{}
	// 默认bootstrapCheckpointPath为空
	if bootstrapCheckpointPath != "" {
		klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
		updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		// 从目录中读取所有checkpoint文件获取所有pod,然后发送PodUpdate(op为restore)到updatechannel
		err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
		if err != nil {
			return nil, err
		}
	}

The Restore function in the PodConfig is responsible for restoring pods from the specified checkpoint path. It does this by reading all the pod checkpoint files from the directory and sending a PodUpdate message with the operation (Op) set to RESTORE to the source’s receiving channel.

pkg\kubelet\config\config.go

go

// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
	if c.checkpointManager != nil {
		return nil
	}
	var err error
	c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
	if err != nil {
		return err
	}
	pods, err := checkpoint.LoadPods(c.checkpointManager)
	if err != nil {
		return err
	}
	updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
	return nil
}

获得目录下的所有pod信息

get all pod informations from path

pkg\kubelet\checkpoint\checkpoint.go

go

type Data struct {
	Pod      *v1.Pod
	Checksum checksum.Checksum
}

// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
	return &Data{Pod: pod}
}

// LoadPods Loads All Checkpoints from disk
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
	pods := make([]*v1.Pod, 0)

	// 获得目录下的所有文件列表
	checkpointKeys, err := cpm.ListCheckpoints()
	if err != nil {
		klog.Errorf("Failed to list checkpoints: %v", err)
	}

	for _, key := range checkpointKeys {
		checkpoint := NewPodCheckpoint(nil)
		// 将文件的内容转化成checkpoint--包含pod和checksum
		err := cpm.GetCheckpoint(key, checkpoint)
		if err != nil {
			klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
			continue
		}
		pods = append(pods, checkpoint.GetPod())
	}
	return pods, nil
}

The GetCheckpoint function in the CheckpointManager is responsible for retrieving a checkpoint from the CheckpointStore, reading the file’s content, converting it into a Checkpoint type, and then verifying the checksum.

pkg\kubelet\checkpointmanager\checkpoint_manager.go

go

// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()
    //读取文件
	blob, err := manager.store.Read(checkpointKey)
	if err != nil {
		if err == utilstore.ErrKeyNotFound {
			return errors.ErrCheckpointNotFound
		}
		return err
	}
	err = checkpoint.UnmarshalCheckpoint(blob)
	if err == nil {
		err = checkpoint.VerifyChecksum()
	}
	return err
}

pkg\kubelet\checkpoint\checkpoint.go

go

// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
	return json.Unmarshal(blob, cp)
}

// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
	return cp.Checksum.Verify(*cp.Pod)
}

pkg\kubelet\checkpointmanager\checksum\checksum.go

go

// Checksum is the data to be stored as checkpoint
type Checksum uint64

// Verify verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
	if cs != New(data) {
		return errors.ErrCorruptCheckpoint
	}
	return nil
}

// New returns the Checksum of checkpoint data
func New(data interface{}) Checksum {
	return Checksum(getChecksum(data))
}

// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
	hash := fnv.New32a()
	hashutil.DeepHashObject(hash, data)
	return uint64(hash.Sum32())
}

The API source in the kubelet is responsible for retrieving pod information from the Kubernetes API server for a specific node. It uses a mechanism involving the Reflector and UndeltaStore to watch for changes in pod resources on the API server and send PodUpdate messages to the source’s receiving channel.

Here’s how the process works:

  1. The Reflector initially performs a list operation on the API server to fetch all pods that are scheduled to run on the node. It filters pods based on the spec.nodeName field to include only those pods that should run on the specific node. This list of pods is obtained through an API call to the Kubernetes API server.
  2. After obtaining the list of pods from the API server, the Reflector invokes the UndeltaStore.Replace method. This method replaces the stored pod list in the UndeltaStore with the newly fetched list of pods. This effectively refreshes the internal representation of pods for this node in the kubelet.
  3. Subsequently, the Reflector sets up a watch on pod resources on the API server. This watch is filtered based on the spec.nodeName, ensuring that only events related to pods scheduled on the specific node are received.
  4. When the Reflector receives watch events from the API server, it processes these events based on their type:
    • If the event type is Added, it means a new pod has been scheduled to run on the node. The Reflector invokes the UndeltaStore.Add method to add the pod to the stored pod list in the UndeltaStore. After adding the pod, it executes the PushFunc (The send in newSourceApiserverFromLW).
    • If the event type is Modified, it means an existing pod’s resource has been updated. The Reflector invokes the UndeltaStore.Update method to update the corresponding pod in the stored pod list in the UndeltaStore. After updating the pod, it executes the PushFunc.
    • If the event type is Deleted, it means a pod has been deleted. The Reflector invokes the UndeltaStore.Delete method to remove the pod from the stored pod list in the UndeltaStore. After deleting the pod, it executes the PushFunc.
  5. For each of these events (Added, Modified, Deleted), the Reflector triggers the PushFunc to send PodUpdate messages to the source’s receiving channel. These PodUpdate messages include the updated list of pods and an operation (Op) indicating the type of change (ADD, UPDATE, DELETE, or RESTORE).

This mechanism allows the kubelet to dynamically reflect changes in pod resources scheduled on the node, ensuring that its internal representation of pods is always up-to-date with the API server. It helps the kubelet adapt to changes in pod assignments and respond to pod additions, modifications, and deletions.

pkg/kubelet/kubelet.go

go

    if kubeDeps.KubeClient != nil {
		klog.Infof("Watching apiserver")
		if updatechannel == nil {
			updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		}
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
	}

pkg\kubelet\config\apiserver.go

go

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}

Related Content