kubelet podConfig--提供kubelet运行pod

kubelet中的podConfig是用来汇聚多个pod来源,对多个pod源中的pod信息进行聚合处理(去重、分析出pod发生的变化)。如果源里的pod发生变化,podConfig会将变化更新聚合,生成各类型的PodUpdate消息发送到PodUpdate通道。

本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库

其中pod源有三种:

  • file–从文件中读取pod的信息,即static pod
  • api–从apiserver中获取这个node节点的所有pod
  • http–通过http获取pod或pod列表信息,即static pod

/post/kubelet-podconfig/podconfig-summary.png

podConfig

podconfig里包括podStorage和mux和接收podUpdate类型消息通道。

podUpdate里包含pod列表、操作类型、源的名称。

podStorage里保存了某一时刻所有源的pod。

mux负责聚合所有源的pod,分析出所有pod的变化,组合成各类podUpdate消息发送接收podUpdate类型消息通道。

podconfig里的聚合器通过source chan,接收PodUpdate消息。聚合器为每个source会启动一个goroutine消费source chan里的消息,然后分析出各种PodUpdate消息,最后将各种PodUpdate发送给updates通道。

pkg\kubelet\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
	pods *podStorage
	// 聚合器
	mux  *config.Mux

	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate

	// contains the list of all configured sources
	sourcesLock       sync.Mutex
	sources           sets.String
	checkpointManager checkpointmanager.CheckpointManager
}

// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order.  Note that this object is an in-memory source of
// "truth" and on creation contains zero entries.  Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
	podLock sync.RWMutex
	// map of source name to pod uid to pod reference
	pods map[string]map[types.UID]*v1.Pod
	mode PodConfigNotificationMode

	// ensures that updates are delivered in strict order
	// on the updates channel
	updateLock sync.Mutex
	updates    chan<- kubetypes.PodUpdate

	// contains the set of all sources that have sent at least one SET
	sourcesSeenLock sync.RWMutex
	sourcesSeen     sets.String

	// the EventRecorder to use
	recorder record.EventRecorder
}

podUpdate定义

pkg\kubelet\types\pod_update.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string
}

在kubelet里PodConfig的初始化在pkg\kubelet\kubelet.go

1
2
3
4
5
6
7
8
	if kubeDeps.PodConfig == nil {
		var err error
		// bootstrapCheckpointPath默认为空
		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
		if err != nil {
			return nil, err
		}
	}

在pkg\kubelet\kubelet.go里makePodSourceConfig创建PodConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

pkg\kubelet\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
	updates := make(chan kubetypes.PodUpdate, 50)
	storage := newPodStorage(updates, mode, recorder)
	podConfig := &PodConfig{
		pods:    storage,
		mux:     config.NewMux(storage),
		updates: updates,
		sources: sets.String{},
	}
	return podConfig
}

podStorage保存某个时间点所有源里的pod,将pod的相关变化发送给update通道

pkg\kubelet\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
	return &podStorage{
		pods:        make(map[string]map[types.UID]*v1.Pod),
		mode:        mode,
		updates:     updates,
		sourcesSeen: sets.String{},
		recorder:    recorder,
	}
}

mux–多路聚合器,从多个源的chan接收pod,然后进行聚合

pkg\util\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Merger interface {
	// Invoked when a change from a source is received.  May also function as an incremental
	// merger if you wish to consume changes incrementally.  Must be reentrant when more than
	// one source is defined.
	Merge(source string, update interface{}) error
}

// Mux is a class for merging configuration from multiple sources.  Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
	// Invoked when an update is sent to a source.
	merger Merger

	// Sources and their lock.
	sourceLock sync.RWMutex
	// Maps source names to channels
	sources map[string]chan interface{}
}

// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
	mux := &Mux{
		sources: make(map[string]chan interface{}),
		merger:  merger,
	}
	return mux
}

podConfig添加一个源,它通过调用Channel返回专属这个源的chan–用于接收源生成的PodUpdates消息,这里统称源接收通道(source chan)。

pkg\kubelet\config\config.go

1
2
3
4
5
6
func (c *PodConfig) Channel(source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	return c.mux.Channel(source)
}

调用多路聚合器Channel

source已经有chan通道,就返回这个chan。否则创建一个source的chan,然后启动一个goroutine消费这个chan(对消息进行加工分类,然后将最终的podUpdate消息发送给merger–PodStorage里的updates–chan通道)

pkg\util\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
// 
// source已经有chan通道,就返回这个chan
// 否则创建一个source的chan,然后启动一个goroutine消费这个chan(对消息进行加工分类,然后将最终的podUpdate消息发送给merger--PodStorage里的updates--chan通道)
func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}

聚合器启动一个goroutine消费这个chan里的消息,进行聚合操作–这里会调用podStorage的Merge进行聚合。

pkg\util\config\config.go

1
2
3
4
5
6
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
        //这里的merger是podStorage 
		m.merger.Merge(source, update)
	}
}

podStorage的Merge

  • 首先根据PodUpdate的Op类型,对PodUpdate消息里的pod进行分析,分析出addPods、updatePods、deletePods、removePods、reconcilePods、restorePods
    • addPods代表增加的pod
    • updatePods代表pod的status之外的字段发生修改,需要进行update操作
    • deletePods代表pod有DeletionTimestamp字段–只发生在api源里pod被删除
    • removePods代表pod从源里移除了–只发生在file或http源
    • reconcilePods代表pod的status字段更新了
    • restorePods代表pod从源里恢复–在kubelet中设置了bootstrapCheckpointPath
  • 根据分析出结果,生成对应的Op类型PodUpdate消息
  • 根据mode(kubelet为PodConfigNotificationIncremental),将不同的PodUpdate消息发送到updates通道
    • mode为PodConfigNotificationIncremental,发生add、update、delete、remove、restore、reconcile pod,则发送相应ADD、UPDATE、DELETE、REMOVE、RESTORE、RECONCILE的PodUpdate事件。如果是刚添加的源,且没有add、update、delete pod,则发送空的ADD的PodUpdate事件–标记这个源已经ready。kubelet使用这个mode。
    • mode为PodConfigNotificationSnapshotAndUpdates,发生remove或add pod或刚添加的源,则发送SET的PodUpdate事件(pod列表为所有源的pod)。发生update、delete pod,则发送UPDATE、DELETE的PodUpdate事件。
    • mode为PodConfigNotificationSnapshot,发生update或delete或add或remove pod或刚添加的源,则发送SET的PodUpdate事件(pod列表为所有源的pod)

PodUpdate消息分析算法

  • Op为ADD或UPDATE或DELETE,比对podStorage的pods与PodUpdate消息中的pod列表,去重并分析出addPods、updatePods、deletePods、reconcilePods,并更新保存在podStorage里的pods。
  • Op为REMOVE,则PodUpdate消息中的pod列表为removePods
  • Op为SET,比对podStorage的pods与PodUpdate消息中的pod列表,去重并分析出addPods、updatePods、deletePods、reconcilePods,最新的pod列表(更新之后PodUpdate消息中的pod列表)替换podStorage里的pods,并计算出removePods被移除的pod列表。
  • Op为RESTORE,则PodUpdate消息中的pod列表为restorePods
  • 根据addPods生成Op为ADD的PodUpdate消息
  • 根据updatePods生成Op为UPDATE的PodUpdate消息
  • 根据deletePods生成Op为DELETE的PodUpdate消息
  • 根据reconcilePods生成Op为RECONCILE的PodUpdate消息
  • 根据restorePods生成Op为RESTORE的PodUpdate消息

/post/kubelet-podconfig/podconfig-merger.png

pkg\kubelet\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
// 
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    ......

从文件中读取pod配置保存在UndeletaStore中,并监听文件变化。UndeletaStore会生成包含所有pod的且OP为SET的PodUpdate消息,发送PodUpdate消息到file源接收通道。

UndeltaStore它是一个线程安全的存储器–里面保存了源的所有的pod。它包含了Add(添加pod到UndeltaStore)、Update(更新pod到UndeltaStore)、Replace(替换pod到UndeltaStore)、Delete(删除UndeltaStore中的pod)方法,无论执行那个方法并会执行PushFunc(将pod列表封装成Op为SET的PodUpdate消息发送给源接收通道)。它定义在/staging/src/k8s.io/client-go/tools/cache/undelta_store.go

PushFunc为newSourceFile函数中send pkg/kubelet/config/file.go

1
2
3
4
5
6
7
8
9
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
	}
	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)

pkg\kubelet\kubelet.go

1
2
3
4
5
6
7
    // define file config source
	// 默认StaticPodPath为空,FileCheckFrequency为20s
	if kubeCfg.StaticPodPath != "" {
		klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
		// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
	}

它会做这几件事情:

  1. 新建sourceFile–初始化UndeltaStore、fileKeyMapping、watch chan。
  2. 启动一个goroutine,listConfig–周期性读取所有pod的配置文件,添加pod到UndeltaStore中,将发现的文件记录到fileKeyMapping。consumeWatchEvent–消费watch chan通道(文件变化事件)–如果是文件添加或修改,则重新读取pod的配置文件,添加pod到UndeltaStore;如果是文件删除,则从fileKeyMapping记录中删除这个文件并从UndeltaStore中删除该pod。
  3. 启动一个goroutine,startWatch–watch文件的变化,生成文件变化事件发送到watch chan中。

/post/kubelet-podconfig/podconfig-file.png

pkg\kubelet\config\file.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
	// "github.com/sigma/go-inotify" requires a path without trailing "/"
	path = strings.TrimRight(path, string(os.PathSeparator))

	config := newSourceFile(path, nodeName, period, updates)
	klog.V(1).Infof("Watching path %q", path)
	config.run()
}

func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
	}
	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
	return &sourceFile{
		path:           path,
		nodeName:       nodeName,
		period:         period,
		store:          store,
		fileKeyMapping: map[string]string{},
		updates:        updates,
		watchEvents:    make(chan *watchEvent, eventBufferLen),
	}
}

func (s *sourceFile) run() {
	listTicker := time.NewTicker(s.period)

	go func() {
		// Read path immediately to speed up startup.
		if err := s.listConfig(); err != nil {
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
		}
		for {
			select {
			case <-listTicker.C:
				if err := s.listConfig(); err != nil {
					klog.Errorf("Unable to read config path %q: %v", s.path, err)
				}
			case e := <-s.watchEvents:
				if err := s.consumeWatchEvent(e); err != nil {
					klog.Errorf("Unable to process watch event: %v", err)
				}
			}
		}
	}()

	// 启动goroutine,watch文件变化,生产watchEvents
	s.startWatch()
}

无论是周期性的读取文件或消费watch事件,都会涉及到读取文件操作extractFromFile。extractFromFile将读取到的文件输出为pod,并且会调用applyDefaults对pod进行修改。这里只支持一个文件包含一个pod。

这些修改包含了生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending,设置annotation。

pkg\kubelet\config\file.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// extractFromFile parses a file for Pod configuration information.
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
	klog.V(3).Infof("Reading config file %q", filename)
	defer func() {
		if err == nil && pod != nil {
			objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
			if keyErr != nil {
				err = keyErr
				return
			}
			s.fileKeyMapping[filename] = objKey
		}
	}()

	file, err := os.Open(filename)
	if err != nil {
		return pod, err
	}
	defer file.Close()

	data, err := utilio.ReadAtMost(file, maxConfigLength)
	if err != nil {
		return pod, err
	}

	defaultFn := func(pod *api.Pod) error {
		return s.applyDefaults(pod, filename)
	}

	// 解析字节流为pod
	// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
	parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
	if parsed {
		if podErr != nil {
			return pod, podErr
		}
		return pod, nil
	}

	return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}

pkg\kubelet\config\common.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
	if len(pod.UID) == 0 {
		hasher := md5.New()
		if isFile {
			fmt.Fprintf(hasher, "host:%s", nodeName)
			fmt.Fprintf(hasher, "file:%s", source)
		} else {
			fmt.Fprintf(hasher, "url:%s", source)
		}
		hash.DeepHashObject(hasher, pod)
		pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))
		klog.V(5).Infof("Generated UID %q pod %q from %s", pod.UID, pod.Name, source)
	}

	pod.Name = generatePodName(pod.Name, nodeName)
	klog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)

	if pod.Namespace == "" {
		pod.Namespace = metav1.NamespaceDefault
	}
	klog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)

	// Set the Host field to indicate this pod is scheduled on the current node.
	pod.Spec.NodeName = string(nodeName)

	pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace)

	if pod.Annotations == nil {
		pod.Annotations = make(map[string]string)
	}
	// The generated UID is the hash of the file.
	pod.Annotations[kubetypes.ConfigHashAnnotationKey] = string(pod.UID)

	if isFile {
		// Applying the default Taint tolerations to static pods,
		// so they are not evicted when there are node problems.
		// 容忍所有NoExecute taint
		helper.AddOrUpdateTolerationInPod(pod, &api.Toleration{
			Operator: "Exists",
			Effect:   api.TaintEffectNoExecute,
		})
	}

	// Set the default status to pending.
	pod.Status.Phase = api.PodPending
	return nil
}

启动goroutine监听文件变化,生成watch事件。如果监听事件发生错误会进行指数回退重试。

pkg\kubelet\config\file_linux.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *sourceFile) startWatch() {
	backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
	backOffID := "watch"

	// 如果func()执行完成,等待一秒后重新执行
	// watch文件变化发生错误的时候,等待1秒,查询是否在回退周期内,不在回退周期内,重新执行watch
	go wait.Forever(func() {
		// 在回退周期中
		if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
			return
		}

		if err := s.doWatch(); err != nil {
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
			// 发生不是文件不存在的错误,进行回退
			if _, retryable := err.(*retryableError); !retryable {
				backOff.Next(backOffID, time.Now())
			}
		}
	}, retryPeriod)
}

启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给源接收通道。

/pkg/kubelet/kubelet.go

1
2
3
4
5
6
7
	// define url config source
	// 默认StaticPodURL为空,HTTPCheckFrequency默认为20s
	if kubeCfg.StaticPodURL != "" {
		klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
		// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
	}

pkg\kubelet\config\http.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
	config := &sourceURL{
		url:      url,
		header:   header,
		nodeName: nodeName,
		updates:  updates,
		data:     nil,
		// Timing out requests leads to retries. This client is only used to
		// read the manifest URL passed to kubelet.
		client: &http.Client{Timeout: 10 * time.Second},
	}
	klog.V(1).Infof("Watching URL %s", url)
	go wait.Until(config.run, period, wait.NeverStop)
}

func (s *sourceURL) run() {
	if err := s.extractFromURL(); err != nil {
		// Don't log this multiple times per minute. The first few entries should be
		// enough to get the point across.
		if s.failureLogs < 3 {
			klog.Warningf("Failed to read pods from URL: %v", err)
		} else if s.failureLogs == 3 {
			klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
		} else {
			klog.V(4).Infof("Failed to read pods from URL: %v", err)
		}
		s.failureLogs++
	} else {
		if s.failureLogs > 0 {
			klog.Info("Successfully read pods from URL.")
			s.failureLogs = 0
		}
	}
}

访问url获取pod或podList配置,同样会对读取到pod进行修改,修改内容跟file源一样。然后发送Op为SET的PodUpdate消息到源接收通道。

pkg\kubelet\config\http.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (s *sourceURL) extractFromURL() error {
	req, err := http.NewRequest("GET", s.url, nil)
	if err != nil {
		return err
	}
	req.Header = s.header
	resp, err := s.client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
	if err != nil {
		return err
	}
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("%v: %v", s.url, resp.Status)
	}
	if len(data) == 0 {
		// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return fmt.Errorf("zero-length data received from %v", s.url)
	}
	// Short circuit if the data has not changed since the last time it was read.
	if bytes.Equal(data, s.data) {
		return nil
	}
	s.data = data

	// First try as it is a single pod.
	parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
	if parsed {
		if singlePodErr != nil {
			// It parsed but could not be used.
			return singlePodErr
		}
		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return nil
	}

	// That didn't work, so try a list of pods.
	parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
	if parsed {
		if multiPodErr != nil {
			// It parsed but could not be used.
			return multiPodErr
		}
		pods := make([]*v1.Pod, 0)
		for i := range podList.Items {
			pods = append(pods, &podList.Items[i])
		}
		s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
		return nil
	}

	return fmt.Errorf("%v: received '%v', but couldn't parse as "+
		"single (%v) or multiple pods (%v)",
		s.url, string(data), singlePodErr, multiPodErr)
}

从bootstrapCheckpointPath(命令行–bootstrap-checkpoint-path)下恢复pod信息,这个属于api源。

pkg/kubelet/kubelet.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
	var updatechannel chan<- interface{}
	// 默认bootstrapCheckpointPath为空
	if bootstrapCheckpointPath != "" {
		klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
		updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		// 从目录中读取所有checkpoint文件获取所有pod,然后发送PodUpdate(op为restore)到updatechannel
		err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
		if err != nil {
			return nil, err
		}
	}

新建一个checkpointManager用来访问目录和读取文件,读取目录下所有pod,发送Op为RESTORE的PodUpdate消息到源接收通道。

pkg\kubelet\config\config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
	if c.checkpointManager != nil {
		return nil
	}
	var err error
	c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
	if err != nil {
		return err
	}
	pods, err := checkpoint.LoadPods(c.checkpointManager)
	if err != nil {
		return err
	}
	updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
	return nil
}

获得目录下的所有pod信息

pkg\kubelet\checkpoint\checkpoint.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Data struct {
	Pod      *v1.Pod
	Checksum checksum.Checksum
}

// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
	return &Data{Pod: pod}
}

// LoadPods Loads All Checkpoints from disk
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
	pods := make([]*v1.Pod, 0)

	// 获得目录下的所有文件列表
	checkpointKeys, err := cpm.ListCheckpoints()
	if err != nil {
		klog.Errorf("Failed to list checkpoints: %v", err)
	}

	for _, key := range checkpointKeys {
		checkpoint := NewPodCheckpoint(nil)
		// 将文件的内容转化成checkpoint--包含pod和checksum
		err := cpm.GetCheckpoint(key, checkpoint)
		if err != nil {
			klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
			continue
		}
		pods = append(pods, checkpoint.GetPod())
	}
	return pods, nil
}

读取文件,将文件内容转化成Data类型,验证hash值是否符合。

pkg\kubelet\checkpointmanager\checkpoint_manager.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()
    //读取文件
	blob, err := manager.store.Read(checkpointKey)
	if err != nil {
		if err == utilstore.ErrKeyNotFound {
			return errors.ErrCheckpointNotFound
		}
		return err
	}
	err = checkpoint.UnmarshalCheckpoint(blob)
	if err == nil {
		err = checkpoint.VerifyChecksum()
	}
	return err
}

pkg\kubelet\checkpoint\checkpoint.go

1
2
3
4
5
6
7
8
9
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
	return json.Unmarshal(blob, cp)
}

// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
	return cp.Checksum.Verify(*cp.Pod)
}

pkg\kubelet\checkpointmanager\checksum\checksum.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Checksum is the data to be stored as checkpoint
type Checksum uint64

// Verify verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
	if cs != New(data) {
		return errors.ErrCorruptCheckpoint
	}
	return nil
}

// New returns the Checksum of checkpoint data
func New(data interface{}) Checksum {
	return Checksum(getChecksum(data))
}

// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
	hash := fnv.New32a()
	hashutil.DeepHashObject(hash, data)
	return uint64(hash.Sum32())
}

从apiserver获取该node上的pod,使用Reflector和UndeltaStore机制,watch并发送PodUpdate消息到源接收通道。

Reflector会先进行list pod(spec.nodeName为该node name,即调度到该节点的pod),然后执行UndeltaStore.Replace替换存储中的pod列表,然后执行PushFunc(为newSourceApiserverFromLW里send)–发送Op为RESTORE的PodUpdate消息到源接收通道。

Reflector list完之后会执行watch pod(spec.nodeName为该node name,即调度到该节点的pod)。收到Added类型的事件,则执行UndeltaStore.Add增加pod到存储中的pod列表,然后执行PushFunc;收到Modified类型的事件,则执行UndeltaStore.Update更新在存储中的pod列表pod,然后执行PushFunc;收到Deleted类型的事件,则执行UndeltaStore.Delete删除在存储中的pod列表pod,然后执行PushFunc。

pkg/kubelet/kubelet.go

1
2
3
4
5
6
7
    if kubeDeps.KubeClient != nil {
		klog.Infof("Watching apiserver")
		if updatechannel == nil {
			updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		}
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
	}

pkg\kubelet\config\apiserver.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}

相关内容