kubelet podConfig - Providing Pods Running on kubelet
The podConfig
within the kubelet is used to aggregate information from multiple sources related to pods. It processes and aggregates information from various sources of pods (deduplication, analysis of changes in pods). If there are changes in pods from these sources, podConfig
updates and aggregates the changes, generating various types of PodUpdate
messages and sending them to the PodUpdate
channel.
This article is based on Kubernetes version 1.18.6. Please visit the source code reading repository.
There are three types of pod sources:
- file: Reads pod information from files, which are static pods.
- api: Retrieves all pods on this node from the API server.
- http: Retrieves pod or pod list information via HTTP, which are static pods.
podConfig
podConfig
includes podStorage
, mux
, and channels for receiving podUpdate
messages.
The podUpdate
contains a list of pods, the operation type, and the source’s name.
The podStorage
keeps all the pods from all sources at a given moment.
The mux
is responsible for aggregating pods from all sources, analyzing changes in all pods, and composing various podUpdate
messages, which are then sent to the channel for receiving podUpdate
messages.
Within podConfig
, the aggregator consumes PodUpdate
messages through the source
channel. For each source
, an associated goroutine is started to consume messages from the source
channel. These goroutines analyze the messages and generate various PodUpdate
messages, which are ultimately sent to the updates
channel.
pkg\kubelet\config\config.go
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
// 聚合器
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
}
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String
// the EventRecorder to use
recorder record.EventRecorder
}
podUpdate definition
pkg\kubelet\types\pod_update.go
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
1 Initializing PodConfig
The initialization of PodConfig
in kubelet can be found in pkg\kubelet\kubelet.go:
if kubeDeps.PodConfig == nil {
var err error
// bootstrapCheckpointPath默认为空
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
In pkg\kubelet\kubelet.go, makePodSourceConfig
is called to create PodConfig
.
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
1.1 Creating a PodConfig
pkg\kubelet\config\config.go
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
podStorage
stores all pods from different sources at a specific time and sends changes related to pods to the updates
channel.
pkg\kubelet\config\config.go
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
}
}
The mux
(multiplexer) is responsible for aggregating pods from various sources and analyzing changes.
pkg\util\config\config.go
type Merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}
// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}
2 Adding a Source
To add a source to podConfig
, you can call the Channel
function, which returns a channel specific to that source. This channel is used to receive PodUpdate
messages generated by the source, and it is referred to as the “source channel.”
pkg\kubelet\config\config.go
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
The Channel
function of the multiplexer (mux
) is invoked. If a channel for the source already exists, it returns that channel. Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel. The goroutine processes messages (categorizing them) and sends the final PodUpdate
messages to the merger (which is the PodStorage
’s updates
channel).
pkg\util\config\config.go
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
//
// If a channel for the source already exists, it returns that channel.
// Otherwise, it creates a new channel for the source and starts a goroutine to consume messages from this channel.
// The goroutine processes messages (categorizing them) and sends the final PodUpdate messages to the merger (which is the PodStorage's updates channel).
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
The aggregator (goroutine) within the multiplexer consumes messages from this channel, processes them (categorizes them), and aggregates them. It then calls podStorage
’s Merge
function to perform aggregation.
pkg\util\config\config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
// The merger here is the podStorage
m.merger.Merge(source, update)
}
}
podStorage Merge
- If the
Op
is ADD, UPDATE, or DELETE, it compares thepodStorage
’spods
with the pod list in thePodUpdate
message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates thepods
inpodStorage
. - If the
Op
is REMOVE, the pod list in thePodUpdate
message represents removePods. - If the
Op
is SET, it compares thepodStorage
’spods
with the pod list in thePodUpdate
message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in thePodUpdate
message) replaces thepods
inpodStorage
, and a list of removed pods is calculated. - If the
Op
is RESTORE, the pod list in thePodUpdate
message represents restorePods. - Based on the analysis results, the corresponding
PodUpdate
messages with different Ops are generated. - Depending on the
mode
(kubelet mode is PodConfigNotificationIncremental), differentPodUpdate
messages are sent to theupdates
channel:- If the mode is
PodConfigNotificationIncremental
and there are changes in add, update, delete, remove, restore, or reconcile pods, corresponding ADD, UPDATE, DELETE, REMOVE, RESTORE, or RECONCILEPodUpdate
events are sent. If a new source is added and there are no add, update, or delete pods, an empty ADDPodUpdate
event is sent to mark the source as ready. This mode is used by kubelet. - If the mode is
PodConfigNotificationSnapshotAndUpdates
, SETPodUpdate
events are sent when remove or add pods or a new source is added. UPDATE and DELETEPodUpdate
events are sent when there are updates or deletes of pods. - If the mode is
PodConfigNotificationSnapshot
, SETPodUpdate
events are sent when there are updates, deletes, adds, removes, or a new source is added.
- If the mode is
The algorithm for processing PodUpdate
messages is responsible for analyzing the operations and generating corresponding PodUpdate
messages based on the changes in pods.
- If the Op is ADD, UPDATE, or DELETE, it compares the
podStorage
’spods
with the pod list in thePodUpdate
message. It identifies addPods, updatePods, deletePods, and reconcilePods, and updates thepods
inpodStorage
. - If the Op is REMOVE, the pod list in the
PodUpdate
message represents removePods. - If the Op is SET, it compares the
podStorage
’spods
with the pod list in thePodUpdate
message. It identifies addPods, updatePods, deletePods, and reconcilePods. The latest pod list (after updating from the pod list in thePodUpdate
message) replaces thepods
inpodStorage
, and a list of removed pods is calculated. - If the Op is RESTORE, the pod list in the
PodUpdate
message represents restorePods. - Based on the analysis results, the corresponding Op-specific
PodUpdate
messages are generated.
pkg\kubelet\config\config.go
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
//
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
......
3 file source
The provided information describes the process of reading pod configurations from files, storing them in the UndeltaStore
, and listening for file changes. The UndeltaStore
is responsible for generating PodUpdate
messages with the operation (Op
) set to SET, which includes all pods, and sending these PodUpdate
messages to the file source’s receiving channel.
The UndeltaStore
is a thread-safe storage that contains all pods from the source. It includes methods such as Add (adding pods to UndeltaStore
), Update (updating pods in UndeltaStore
), Replace (replacing pods in UndeltaStore
), and Delete (deleting pods from UndeltaStore
). Regardless of which method is executed, it will also execute PushFunc, which packages the pod list into a PodUpdate
message with the operation set to SET and sends it to the source’s receiving channel. The UndeltaStore
is defined in /staging/src/k8s.io/client-go/tools/cache/undelta_store.go.
The PushFunc is function send
in newSourceFile
pkg/kubelet/config/file.go
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
// define file config source
// 默认StaticPodPath为空,FileCheckFrequency为20s
if kubeCfg.StaticPodPath != "" {
klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
// Start a goroutine to watch file changes and periodically read files, generating PodUpdate messages (with Op set to SET, including all pods) and sending them to the updates channel
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
The code performs the following actions:
- Creates a
sourceFile
and initializesUndeltaStore
,fileKeyMapping
, andwatch
channel. - Starts a goroutine called
listConfig
, which periodically reads the configurations of all pods, adds pods to theUndeltaStore
, and records discovered files infileKeyMapping
. It also consumes events from thewatch
channel. If a file is added or modified, it re-reads the pod’s configuration file and adds the pod to theUndeltaStore
. If a file is deleted, it removes the file from thefileKeyMapping
records and deletes the corresponding pod from theUndeltaStore
. - Starts another goroutine called
startWatch
to watch for file changes, generate file change events, and send them to thewatch
channel.
pkg\kubelet\config\file.go
// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
// "github.com/sigma/go-inotify" requires a path without trailing "/"
path = strings.TrimRight(path, string(os.PathSeparator))
config := newSourceFile(path, nodeName, period, updates)
klog.V(1).Infof("Watching path %q", path)
config.run()
}
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
return &sourceFile{
path: path,
nodeName: nodeName,
period: period,
store: store,
fileKeyMapping: map[string]string{},
updates: updates,
watchEvents: make(chan *watchEvent, eventBufferLen),
}
}
func (s *sourceFile) run() {
listTicker := time.NewTicker(s.period)
go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.Errorf("Unable to process watch event: %v", err)
}
}
}
}()
// 启动goroutine,watch文件变化,生产watchEvents
s.startWatch()
}
3.1 read the pod config file extractFromFile
The extractFromFile
function is responsible for reading pod configuration files and converting the file contents into a pod object. It also applies default values and modifications to the pod. It’s worth noting that each file is expected to contain the configuration for a single pod.
Here are some of the modifications and default values applied by extractFromFile
:
- Generating a unique UID for the pod.
- Setting the pod’s name.
- Converting an empty namespace to “default” if it’s not specified.
- Setting
Spec.NodeName
to a specific value. - Setting
ObjectMeta.SelfLink
. - Adding tolerations for all NoExecute taints.
- Setting
Status.Phase
to “Pending.” - Adding annotations to the pod.
pkg\kubelet\config\file.go
// extractFromFile parses a file for Pod configuration information.
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
klog.V(3).Infof("Reading config file %q", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
if keyErr != nil {
err = keyErr
return
}
s.fileKeyMapping[filename] = objKey
}
}()
file, err := os.Open(filename)
if err != nil {
return pod, err
}
defer file.Close()
data, err := utilio.ReadAtMost(file, maxConfigLength)
if err != nil {
return pod, err
}
defaultFn := func(pod *api.Pod) error {
return s.applyDefaults(pod, filename)
}
// 解析字节流为pod
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
if parsed {
if podErr != nil {
return pod, podErr
}
return pod, nil
}
return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}
pkg\kubelet\config\common.go
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
if len(pod.UID) == 0 {
hasher := md5.New()
if isFile {
fmt.Fprintf(hasher, "host:%s", nodeName)
fmt.Fprintf(hasher, "file:%s", source)
} else {
fmt.Fprintf(hasher, "url:%s", source)
}
hash.DeepHashObject(hasher, pod)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))
klog.V(5).Infof("Generated UID %q pod %q from %s", pod.UID, pod.Name, source)
}
pod.Name = generatePodName(pod.Name, nodeName)
klog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)
if pod.Namespace == "" {
pod.Namespace = metav1.NamespaceDefault
}
klog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)
// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName = string(nodeName)
pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace)
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
// The generated UID is the hash of the file.
pod.Annotations[kubetypes.ConfigHashAnnotationKey] = string(pod.UID)
if isFile {
// Applying the default Taint tolerations to static pods,
// so they are not evicted when there are node problems.
// 容忍所有NoExecute taint
helper.AddOrUpdateTolerationInPod(pod, &api.Toleration{
Operator: "Exists",
Effect: api.TaintEffectNoExecute,
})
}
// Set the default status to pending.
pod.Status.Phase = api.PodPending
return nil
}
3.2 startWatch
The startWatch
function is responsible for launching a goroutine to monitor file changes and generate watch events. If there is an error while monitoring events, it employs exponential backoff and retries to recover from the error.
Here is a high-level overview of what startWatch
does:
- It starts a goroutine to continuously watch for file changes.
- If an error occurs while monitoring events, it employs exponential backoff and retries to handle the error gracefully.
- When a file change is detected, it generates a watch event and sends it to the
watch
channel for further processing.
This mechanism ensures that the kubelet can detect changes in pod configuration files and respond to them in a resilient manner, even in the presence of errors or temporary issues.
pkg\kubelet\config\file_linux.go
func (s *sourceFile) startWatch() {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffID := "watch"
// 如果func()执行完成,等待一秒后重新执行
// watch文件变化发生错误的时候,等待1秒,查询是否在回退周期内,不在回退周期内,重新执行watch
go wait.Forever(func() {
// 在回退周期中
if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
return
}
if err := s.doWatch(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
// 发生不是文件不存在的错误,进行回退
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffID, time.Now())
}
}
}, retryPeriod)
}
4 HTTP source
The HTTP source is responsible for launching a goroutine that periodically sends HTTP requests to a specified URL in order to retrieve pod or pod list information. It then generates a PodUpdate
message with the operation (Op
) set to SET, which includes all the pods obtained from the HTTP request, and sends this message to the source’s receiving channel.
Here is a breakdown of how the HTTP source works:
- It starts a goroutine to periodically send HTTP requests to a specified URL.
- The HTTP requests are made at regular intervals to fetch pod information or a list of pods.
- Once the HTTP response is received, it extracts the pod information.
- It generates a
PodUpdate
message with the operation set to SET, which includes all the pods obtained from the HTTP response. - This
PodUpdate
message is sent to the source’s receiving channel for further processing.
The HTTP source allows the kubelet to retrieve pod information from an external source via HTTP requests and keep the pod configurations up to date based on the information obtained from the URL.
// define url config source
// 默认StaticPodURL为空,HTTPCheckFrequency默认为20s
if kubeCfg.StaticPodURL != "" {
klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
pkg\kubelet\config\http.go
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
config := &sourceURL{
url: url,
header: header,
nodeName: nodeName,
updates: updates,
data: nil,
// Timing out requests leads to retries. This client is only used to
// read the manifest URL passed to kubelet.
client: &http.Client{Timeout: 10 * time.Second},
}
klog.V(1).Infof("Watching URL %s", url)
go wait.Until(config.run, period, wait.NeverStop)
}
func (s *sourceURL) run() {
if err := s.extractFromURL(); err != nil {
// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
if s.failureLogs < 3 {
klog.Warningf("Failed to read pods from URL: %v", err)
} else if s.failureLogs == 3 {
klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
} else {
klog.V(4).Infof("Failed to read pods from URL: %v", err)
}
s.failureLogs++
} else {
if s.failureLogs > 0 {
klog.Info("Successfully read pods from URL.")
s.failureLogs = 0
}
}
}
4.1 extractFromURL
The extractFromURL
function is responsible for making an HTTP request to a specified URL to retrieve pod or pod list configurations. Similar to the file source, it also applies modifications to the pods read from the URL and sends a PodUpdate
message with the operation (Op
) set to SET to the source’s receiving channel.
Here’s an overview of what extractFromURL
does:
- It makes an HTTP request to the specified URL to retrieve pod or pod list configurations.
- Once the HTTP response is received, it extracts the pod configurations.
- It applies modifications to the pods read from the URL, which may include generating a unique UID for each pod, setting pod names, converting empty namespaces to “default,” and making other necessary adjustments.
- Finally, it sends a
PodUpdate
message with the operation set to SET, including all the modified pods, to the source’s receiving channel.
This process ensures that pod configurations retrieved from the URL are properly processed and sent to the kubelet for further handling.
pkg\kubelet\config\http.go
func (s *sourceURL) extractFromURL() error {
req, err := http.NewRequest("GET", s.url, nil)
if err != nil {
return err
}
req.Header = s.header
resp, err := s.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%v: %v", s.url, resp.Status)
}
if len(data) == 0 {
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// Short circuit if the data has not changed since the last time it was read.
if bytes.Equal(data, s.data) {
return nil
}
s.data = data
// First try as it is a single pod.
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
return singlePodErr
}
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
// That didn't work, so try a list of pods.
parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.
return multiPodErr
}
pods := make([]*v1.Pod, 0)
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as "+
"single (%v) or multiple pods (%v)",
s.url, string(data), singlePodErr, multiPodErr)
}
5 Restore from bootstrapCheckpointPath
Restoring pod information from the bootstrapCheckpointPath
is a process that belongs to the API source. The bootstrapCheckpointPath
is typically provided as a command-line argument (--bootstrap-checkpoint-path
) when starting the kubelet.
Here’s what happens during the restoration process:
- The kubelet checks if the
bootstrapCheckpointPath
is provided. - If the
bootstrapCheckpointPath
exists and is configured, it indicates that the kubelet was previously running and has saved some pod information. - The kubelet reads the saved pod information from the
bootstrapCheckpointPath
. - It then proceeds to restore the pod information, which may include details about running pods, from the saved data.
- The restored pod information is processed and integrated into the kubelet’s current state, allowing it to continue managing the pods based on the restored data.
The purpose of restoring pod information from the bootstrapCheckpointPath
is to ensure that the kubelet can recover its previous state and continue managing pods from where it left off in case of unexpected interruptions or restarts. This is particularly important for maintaining pod consistency and ensuring that no data is lost during kubelet restarts.
var updatechannel chan<- interface{}
// 默认bootstrapCheckpointPath为空
if bootstrapCheckpointPath != "" {
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
// 从目录中读取所有checkpoint文件获取所有pod,然后发送PodUpdate(op为restore)到updatechannel
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
The Restore
function in the PodConfig
is responsible for restoring pods from the specified checkpoint path. It does this by reading all the pod checkpoint files from the directory and sending a PodUpdate
message with the operation (Op
) set to RESTORE to the source’s receiving channel.
pkg\kubelet\config\config.go
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
if c.checkpointManager != nil {
return nil
}
var err error
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
return err
}
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err != nil {
return err
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
return nil
}
获得目录下的所有pod信息
get all pod informations from path
pkg\kubelet\checkpoint\checkpoint.go
type Data struct {
Pod *v1.Pod
Checksum checksum.Checksum
}
// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
return &Data{Pod: pod}
}
// LoadPods Loads All Checkpoints from disk
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 0)
// 获得目录下的所有文件列表
checkpointKeys, err := cpm.ListCheckpoints()
if err != nil {
klog.Errorf("Failed to list checkpoints: %v", err)
}
for _, key := range checkpointKeys {
checkpoint := NewPodCheckpoint(nil)
// 将文件的内容转化成checkpoint--包含pod和checksum
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
}
pods = append(pods, checkpoint.GetPod())
}
return pods, nil
}
The GetCheckpoint
function in the CheckpointManager
is responsible for retrieving a checkpoint from the CheckpointStore
, reading the file’s content, converting it into a Checkpoint
type, and then verifying the checksum.
pkg\kubelet\checkpointmanager\checkpoint_manager.go
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
//读取文件
blob, err := manager.store.Read(checkpointKey)
if err != nil {
if err == utilstore.ErrKeyNotFound {
return errors.ErrCheckpointNotFound
}
return err
}
err = checkpoint.UnmarshalCheckpoint(blob)
if err == nil {
err = checkpoint.VerifyChecksum()
}
return err
}
pkg\kubelet\checkpoint\checkpoint.go
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}
pkg\kubelet\checkpointmanager\checksum\checksum.go
// Checksum is the data to be stored as checkpoint
type Checksum uint64
// Verify verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
if cs != New(data) {
return errors.ErrCorruptCheckpoint
}
return nil
}
// New returns the Checksum of checkpoint data
func New(data interface{}) Checksum {
return Checksum(getChecksum(data))
}
// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, data)
return uint64(hash.Sum32())
}
6 api source
The API source in the kubelet is responsible for retrieving pod information from the Kubernetes API server for a specific node. It uses a mechanism involving the Reflector and UndeltaStore to watch for changes in pod resources on the API server and send PodUpdate
messages to the source’s receiving channel.
Here’s how the process works:
- The Reflector initially performs a list operation on the API server to fetch all pods that are scheduled to run on the node. It filters pods based on the
spec.nodeName
field to include only those pods that should run on the specific node. This list of pods is obtained through an API call to the Kubernetes API server. - After obtaining the list of pods from the API server, the Reflector invokes the
UndeltaStore.Replace
method. This method replaces the stored pod list in the UndeltaStore with the newly fetched list of pods. This effectively refreshes the internal representation of pods for this node in the kubelet. - Subsequently, the Reflector sets up a watch on pod resources on the API server. This watch is filtered based on the
spec.nodeName
, ensuring that only events related to pods scheduled on the specific node are received. - When the Reflector receives watch events from the API server, it processes these events based on their type:
- If the event type is
Added
, it means a new pod has been scheduled to run on the node. The Reflector invokes theUndeltaStore.Add
method to add the pod to the stored pod list in the UndeltaStore. After adding the pod, it executes thePushFunc
(Thesend
innewSourceApiserverFromLW
). - If the event type is
Modified
, it means an existing pod’s resource has been updated. The Reflector invokes theUndeltaStore.Update
method to update the corresponding pod in the stored pod list in the UndeltaStore. After updating the pod, it executes thePushFunc
. - If the event type is
Deleted
, it means a pod has been deleted. The Reflector invokes theUndeltaStore.Delete
method to remove the pod from the stored pod list in the UndeltaStore. After deleting the pod, it executes thePushFunc
.
- If the event type is
- For each of these events (Added, Modified, Deleted), the Reflector triggers the
PushFunc
to sendPodUpdate
messages to the source’s receiving channel. ThesePodUpdate
messages include the updated list of pods and an operation (Op
) indicating the type of change (ADD, UPDATE, DELETE, or RESTORE).
This mechanism allows the kubelet to dynamically reflect changes in pod resources scheduled on the node, ensuring that its internal representation of pods is always up-to-date with the API server. It helps the kubelet adapt to changes in pod assignments and respond to pod additions, modifications, and deletions.
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
pkg\kubelet\config\apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}