kubelet podConfig--提供kubelet运行pod
kubelet中的podConfig是用来汇聚多个pod来源,对多个pod源中的pod信息进行聚合处理(去重、分析出pod发生的变化)。如果源里的pod发生变化,podConfig会将变化更新聚合,生成各类型的PodUpdate消息发送到PodUpdate通道。
本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库。
其中pod源有三种:
- file–从文件中读取pod的信息,即static pod
- api–从apiserver中获取这个node节点的所有pod
- http–通过http获取pod或pod列表信息,即static pod
podConfig
podconfig里包括podStorage和mux和接收podUpdate类型消息通道。
podUpdate里包含pod列表、操作类型、源的名称。
podStorage里保存了某一时刻所有源的pod。
mux负责聚合所有源的pod,分析出所有pod的变化,组合成各类podUpdate消息发送接收podUpdate类型消息通道。
podconfig里的聚合器通过source chan,接收PodUpdate消息。聚合器为每个source会启动一个goroutine消费source chan里的消息,然后分析出各种PodUpdate消息,最后将各种PodUpdate发送给updates通道。
pkg\kubelet\config\config.go
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
// 聚合器
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
}
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String
// the EventRecorder to use
recorder record.EventRecorder
}
podUpdate定义
pkg\kubelet\types\pod_update.go
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
1 初始化PodConfig
在kubelet里PodConfig的初始化在pkg\kubelet\kubelet.go
if kubeDeps.PodConfig == nil {
var err error
// bootstrapCheckpointPath默认为空
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
在pkg\kubelet\kubelet.go里makePodSourceConfig创建PodConfig
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
1.1 创建一个podconfig
pkg\kubelet\config\config.go
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
podStorage保存某个时间点所有源里的pod,将pod的相关变化发送给update通道
pkg\kubelet\config\config.go
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
}
}
mux–多路聚合器,从多个源的chan接收pod,然后进行聚合
pkg\util\config\config.go
type Merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}
// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}
2 增加源
podConfig添加一个源,它通过调用Channel返回专属这个源的chan–用于接收源生成的PodUpdates消息,这里统称源接收通道(source chan)。
pkg\kubelet\config\config.go
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
调用多路聚合器Channel
source已经有chan通道,就返回这个chan。否则创建一个source的chan,然后启动一个goroutine消费这个chan(对消息进行加工分类,然后将最终的podUpdate消息发送给merger–PodStorage里的updates–chan通道)
pkg\util\config\config.go
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
//
// source已经有chan通道,就返回这个chan
// 否则创建一个source的chan,然后启动一个goroutine消费这个chan(对消息进行加工分类,然后将最终的podUpdate消息发送给merger--PodStorage里的updates--chan通道)
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
聚合器启动一个goroutine消费这个chan里的消息,进行聚合操作–这里会调用podStorage的Merge进行聚合。
pkg\util\config\config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
//这里的merger是podStorage
m.merger.Merge(source, update)
}
}
podStorage的Merge
- 首先根据PodUpdate的Op类型,对PodUpdate消息里的pod进行分析,分析出addPods、updatePods、deletePods、removePods、reconcilePods、restorePods
- addPods代表增加的pod
- updatePods代表pod的status之外的字段发生修改,需要进行update操作
- deletePods代表pod有DeletionTimestamp字段–只发生在api源里pod被删除
- removePods代表pod从源里移除了–只发生在file或http源
- reconcilePods代表pod的status字段更新了
- restorePods代表pod从源里恢复–在kubelet中设置了bootstrapCheckpointPath
- 根据分析出结果,生成对应的Op类型PodUpdate消息
- 根据mode(kubelet为PodConfigNotificationIncremental),将不同的PodUpdate消息发送到updates通道
- mode为PodConfigNotificationIncremental,发生add、update、delete、remove、restore、reconcile pod,则发送相应ADD、UPDATE、DELETE、REMOVE、RESTORE、RECONCILE的PodUpdate事件。如果是刚添加的源,且没有add、update、delete pod,则发送空的ADD的PodUpdate事件–标记这个源已经ready。kubelet使用这个mode。
- mode为PodConfigNotificationSnapshotAndUpdates,发生remove或add pod或刚添加的源,则发送SET的PodUpdate事件(pod列表为所有源的pod)。发生update、delete pod,则发送UPDATE、DELETE的PodUpdate事件。
- mode为PodConfigNotificationSnapshot,发生update或delete或add或remove pod或刚添加的源,则发送SET的PodUpdate事件(pod列表为所有源的pod)
PodUpdate消息分析算法
- Op为ADD或UPDATE或DELETE,比对podStorage的pods与PodUpdate消息中的pod列表,去重并分析出addPods、updatePods、deletePods、reconcilePods,并更新保存在podStorage里的pods。
- Op为REMOVE,则PodUpdate消息中的pod列表为removePods
- Op为SET,比对podStorage的pods与PodUpdate消息中的pod列表,去重并分析出addPods、updatePods、deletePods、reconcilePods,最新的pod列表(更新之后PodUpdate消息中的pod列表)替换podStorage里的pods,并计算出removePods被移除的pod列表。
- Op为RESTORE,则PodUpdate消息中的pod列表为restorePods
- 根据addPods生成Op为ADD的PodUpdate消息
- 根据updatePods生成Op为UPDATE的PodUpdate消息
- 根据deletePods生成Op为DELETE的PodUpdate消息
- 根据reconcilePods生成Op为RECONCILE的PodUpdate消息
- 根据restorePods生成Op为RESTORE的PodUpdate消息
pkg\kubelet\config\config.go
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
//
// 将输入source的PodUpdate的pod列表去重
// 将PodUpdate与s.pods[source]的pod进行比对,加工出各个OP类型PodUpdate,发送到s.updates通道中
// 输入PodUpdate的OP可能与输出PodUpdate的OP不一样
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
......
3 file source
从文件中读取pod配置保存在UndeletaStore中,并监听文件变化。UndeletaStore会生成包含所有pod的且OP为SET的PodUpdate消息,发送PodUpdate消息到file源接收通道。
UndeltaStore它是一个线程安全的存储器–里面保存了源的所有的pod。它包含了Add(添加pod到UndeltaStore)、Update(更新pod到UndeltaStore)、Replace(替换pod到UndeltaStore)、Delete(删除UndeltaStore中的pod)方法,无论执行那个方法并会执行PushFunc(将pod列表封装成Op为SET的PodUpdate消息发送给源接收通道)。它定义在/staging/src/k8s.io/client-go/tools/cache/undelta_store.go
PushFunc为newSourceFile
函数中send
pkg/kubelet/config/file.go
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
// define file config source
// 默认StaticPodPath为空,FileCheckFrequency为20s
if kubeCfg.StaticPodPath != "" {
klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
它会做这几件事情:
- 新建sourceFile–初始化UndeltaStore、fileKeyMapping、watch chan。
- 启动一个goroutine,listConfig–周期性读取所有pod的配置文件,添加pod到UndeltaStore中,将发现的文件记录到fileKeyMapping。consumeWatchEvent–消费watch chan通道(文件变化事件)–如果是文件添加或修改,则重新读取pod的配置文件,添加pod到UndeltaStore;如果是文件删除,则从fileKeyMapping记录中删除这个文件并从UndeltaStore中删除该pod。
- 启动一个goroutine,startWatch–watch文件的变化,生成文件变化事件发送到watch chan中。
pkg\kubelet\config\file.go
// NewSourceFile watches a config file for changes.
//
// 启动gorotine watch文件变化和周期性读取文件,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
// "github.com/sigma/go-inotify" requires a path without trailing "/"
path = strings.TrimRight(path, string(os.PathSeparator))
config := newSourceFile(path, nodeName, period, updates)
klog.V(1).Infof("Watching path %q", path)
config.run()
}
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
return &sourceFile{
path: path,
nodeName: nodeName,
period: period,
store: store,
fileKeyMapping: map[string]string{},
updates: updates,
watchEvents: make(chan *watchEvent, eventBufferLen),
}
}
func (s *sourceFile) run() {
listTicker := time.NewTicker(s.period)
go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.Errorf("Unable to process watch event: %v", err)
}
}
}
}()
// 启动goroutine,watch文件变化,生产watchEvents
s.startWatch()
}
3.1 读取pod配置文件extractFromFile
无论是周期性的读取文件或消费watch事件,都会涉及到读取文件操作extractFromFile。extractFromFile将读取到的文件输出为pod,并且会调用applyDefaults对pod进行修改。这里只支持一个文件包含一个pod。
这些修改包含了生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending,设置annotation。
pkg\kubelet\config\file.go
// extractFromFile parses a file for Pod configuration information.
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
klog.V(3).Infof("Reading config file %q", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
if keyErr != nil {
err = keyErr
return
}
s.fileKeyMapping[filename] = objKey
}
}()
file, err := os.Open(filename)
if err != nil {
return pod, err
}
defer file.Close()
data, err := utilio.ReadAtMost(file, maxConfigLength)
if err != nil {
return pod, err
}
defaultFn := func(pod *api.Pod) error {
return s.applyDefaults(pod, filename)
}
// 解析字节流为pod
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
if parsed {
if podErr != nil {
return pod, podErr
}
return pod, nil
}
return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}
pkg\kubelet\config\common.go
// 生成pod的UID、pod的name、转换空的namespace为default、设置Spec.NodeName、ObjectMeta.SelfLink、添加容忍所有NoExecute taint、设置Status.Phase为pending
func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
if len(pod.UID) == 0 {
hasher := md5.New()
if isFile {
fmt.Fprintf(hasher, "host:%s", nodeName)
fmt.Fprintf(hasher, "file:%s", source)
} else {
fmt.Fprintf(hasher, "url:%s", source)
}
hash.DeepHashObject(hasher, pod)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))
klog.V(5).Infof("Generated UID %q pod %q from %s", pod.UID, pod.Name, source)
}
pod.Name = generatePodName(pod.Name, nodeName)
klog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)
if pod.Namespace == "" {
pod.Namespace = metav1.NamespaceDefault
}
klog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)
// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName = string(nodeName)
pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace)
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
// The generated UID is the hash of the file.
pod.Annotations[kubetypes.ConfigHashAnnotationKey] = string(pod.UID)
if isFile {
// Applying the default Taint tolerations to static pods,
// so they are not evicted when there are node problems.
// 容忍所有NoExecute taint
helper.AddOrUpdateTolerationInPod(pod, &api.Toleration{
Operator: "Exists",
Effect: api.TaintEffectNoExecute,
})
}
// Set the default status to pending.
pod.Status.Phase = api.PodPending
return nil
}
3.2 startWatch
启动goroutine监听文件变化,生成watch事件。如果监听事件发生错误会进行指数回退重试。
pkg\kubelet\config\file_linux.go
func (s *sourceFile) startWatch() {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffID := "watch"
// 如果func()执行完成,等待一秒后重新执行
// watch文件变化发生错误的时候,等待1秒,查询是否在回退周期内,不在回退周期内,重新执行watch
go wait.Forever(func() {
// 在回退周期中
if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
return
}
if err := s.doWatch(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
// 发生不是文件不存在的错误,进行回退
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffID, time.Now())
}
}
}, retryPeriod)
}
4 HTTP source
启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给源接收通道。
// define url config source
// 默认StaticPodURL为空,HTTPCheckFrequency默认为20s
if kubeCfg.StaticPodURL != "" {
klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
pkg\kubelet\config\http.go
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
//
// 启动goroutine 周期请求url获取pod或podlist,生成PodUpdate(op为set,包含所有pod)消息发送给updates通道
func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
config := &sourceURL{
url: url,
header: header,
nodeName: nodeName,
updates: updates,
data: nil,
// Timing out requests leads to retries. This client is only used to
// read the manifest URL passed to kubelet.
client: &http.Client{Timeout: 10 * time.Second},
}
klog.V(1).Infof("Watching URL %s", url)
go wait.Until(config.run, period, wait.NeverStop)
}
func (s *sourceURL) run() {
if err := s.extractFromURL(); err != nil {
// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
if s.failureLogs < 3 {
klog.Warningf("Failed to read pods from URL: %v", err)
} else if s.failureLogs == 3 {
klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
} else {
klog.V(4).Infof("Failed to read pods from URL: %v", err)
}
s.failureLogs++
} else {
if s.failureLogs > 0 {
klog.Info("Successfully read pods from URL.")
s.failureLogs = 0
}
}
}
4.1 extractFromURL
访问url获取pod或podList配置,同样会对读取到pod进行修改,修改内容跟file源一样。然后发送Op为SET的PodUpdate消息到源接收通道。
pkg\kubelet\config\http.go
func (s *sourceURL) extractFromURL() error {
req, err := http.NewRequest("GET", s.url, nil)
if err != nil {
return err
}
req.Header = s.header
resp, err := s.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%v: %v", s.url, resp.Status)
}
if len(data) == 0 {
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// Short circuit if the data has not changed since the last time it was read.
if bytes.Equal(data, s.data) {
return nil
}
s.data = data
// First try as it is a single pod.
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
return singlePodErr
}
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
// That didn't work, so try a list of pods.
parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.
return multiPodErr
}
pods := make([]*v1.Pod, 0)
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as "+
"single (%v) or multiple pods (%v)",
s.url, string(data), singlePodErr, multiPodErr)
}
5 Restore from bootstrapCheckpointPath
从bootstrapCheckpointPath(命令行–bootstrap-checkpoint-path)下恢复pod信息,这个属于api源。
var updatechannel chan<- interface{}
// 默认bootstrapCheckpointPath为空
if bootstrapCheckpointPath != "" {
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
// 从目录中读取所有checkpoint文件获取所有pod,然后发送PodUpdate(op为restore)到updatechannel
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
新建一个checkpointManager用来访问目录和读取文件,读取目录下所有pod,发送Op为RESTORE的PodUpdate消息到源接收通道。
pkg\kubelet\config\config.go
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
if c.checkpointManager != nil {
return nil
}
var err error
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
return err
}
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err != nil {
return err
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
return nil
}
获得目录下的所有pod信息
pkg\kubelet\checkpoint\checkpoint.go
type Data struct {
Pod *v1.Pod
Checksum checksum.Checksum
}
// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
return &Data{Pod: pod}
}
// LoadPods Loads All Checkpoints from disk
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 0)
// 获得目录下的所有文件列表
checkpointKeys, err := cpm.ListCheckpoints()
if err != nil {
klog.Errorf("Failed to list checkpoints: %v", err)
}
for _, key := range checkpointKeys {
checkpoint := NewPodCheckpoint(nil)
// 将文件的内容转化成checkpoint--包含pod和checksum
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
}
pods = append(pods, checkpoint.GetPod())
}
return pods, nil
}
读取文件,将文件内容转化成Data类型,验证hash值是否符合。
pkg\kubelet\checkpointmanager\checkpoint_manager.go
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
//读取文件
blob, err := manager.store.Read(checkpointKey)
if err != nil {
if err == utilstore.ErrKeyNotFound {
return errors.ErrCheckpointNotFound
}
return err
}
err = checkpoint.UnmarshalCheckpoint(blob)
if err == nil {
err = checkpoint.VerifyChecksum()
}
return err
}
pkg\kubelet\checkpoint\checkpoint.go
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}
pkg\kubelet\checkpointmanager\checksum\checksum.go
// Checksum is the data to be stored as checkpoint
type Checksum uint64
// Verify verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
if cs != New(data) {
return errors.ErrCorruptCheckpoint
}
return nil
}
// New returns the Checksum of checkpoint data
func New(data interface{}) Checksum {
return Checksum(getChecksum(data))
}
// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, data)
return uint64(hash.Sum32())
}
6 api source
从apiserver获取该node上的pod,使用Reflector和UndeltaStore机制,watch并发送PodUpdate消息到源接收通道。
Reflector会先进行list pod(spec.nodeName为该node name,即调度到该节点的pod),然后执行UndeltaStore.Replace替换存储中的pod列表,然后执行PushFunc(为newSourceApiserverFromLW里send)–发送Op为RESTORE的PodUpdate消息到源接收通道。
Reflector list完之后会执行watch pod(spec.nodeName为该node name,即调度到该节点的pod)。收到Added类型的事件,则执行UndeltaStore.Add增加pod到存储中的pod列表,然后执行PushFunc;收到Modified类型的事件,则执行UndeltaStore.Update更新在存储中的pod列表pod,然后执行PushFunc;收到Deleted类型的事件,则执行UndeltaStore.Delete删除在存储中的pod列表pod,然后执行PushFunc。
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
pkg\kubelet\config\apiserver.go
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}