kubelet启动--命令执行

前面已经分析了kubelet启动命令行初始化,接下来介绍kubelet的cobra Command执行。cobra的Excute()–简单理解就是先进行命令行解析(如果设置了DisableFlagParsing: true则不解析),再将命令解析完剩下参数传给Command.Run进行执行。

本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库

由于设置DisableFlagParsing: true所以Run的参数args是用户输入的命令行(os.Args[1:]),即Run(Command, os.Args[1:])。

cmd\kubelet\app\server.go#L148-L152

go

		// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
		// so we do all our parsing manually in Run, below.
		// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
		// `args` arg to Run, without Cobra's interference.
		DisableFlagParsing: true,

命令行定义在cleanFlagSet,进行命令解析,设置kubeletFlags和kubeletConfig相应字段的值。

命令行解析完之后,判断是命令行否有非flag参数,有的话就退出–kubelet是没有子命令的。

go

	Run: func(cmd *cobra.Command, args []string) {
			// initial flag parse, since we disable cobra's flag parsing
			if err := cleanFlagSet.Parse(args); err != nil {
				cmd.Usage()
				klog.Fatal(err)
			}

			// check if there are non-flag arguments in the command line
			cmds := cleanFlagSet.Args()
			if len(cmds) > 0 {
				cmd.Usage()
				klog.Fatalf("unknown command: %s", cmds[0])
			}

如果是命令行帮助的化,直接执行cmd.Help()打印帮助信息。

go

			// short-circuit on help
			help, err := cleanFlagSet.GetBool("help")
			if err != nil {
				klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
			}
			if help {
				cmd.Help()
				return
			}

打印当前版本,具体版本相关处理过程请看

go

			// short-circuit on verflag
			// 访问 --version
			verflag.PrintAndExitIfRequested()

go

utilflag.PrintFlags(cleanFlagSet)

pkg\util\flag\flags.go

go

// PrintFlags logs the flags in the flagset
func PrintFlags(flags *pflag.FlagSet) {
	flags.VisitAll(func(flag *pflag.Flag) {
		klog.V(1).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
	})
}

在kubelet.INFO里看到这样的日志。

text

I1127 22:24:29.104931  788647 flags.go:59] FLAG: --add-dir-header="false"
I1127 22:24:29.114617  788647 flags.go:59] FLAG: --address="10.19.0.11"
I1127 22:24:29.114630  788647 flags.go:59] FLAG: --allowed-unsafe-sysctls="[]"
I1127 22:24:29.114636  788647 flags.go:59] FLAG: --alsologtostderr="false"
I1127 22:24:29.114640  788647 flags.go:59] FLAG: --anonymous-auth="false"

设置kubeletConfig中的featureGates加入到DefaultMutableFeatureGate

具体初始化过程,请查看这篇文章

go

// set feature gates from initial flags-based config
			if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
				klog.Fatal(err)
			}
  • 配置了-- dynamic-config-dir则必须启用了DynamicKubeletConfig featuregate

  • --node-status-max-images必须大于等于-1

  • --node-labels 应该是普通label–非``kubernetes.io`前缀的label,或是满足下面条件label

    text

    label在下面列表中:
    kubernetes.io/hostname 
    topology.kubernetes.io/zone
    topology.kubernetes.io/region
    failure-domain.beta.kubernetes.io/zone
    failure-domain.beta.kubernetes.io/region
    beta.kubernetes.io/instance-type
    node.kubernetes.io/instance-type
    kubernetes.io/os
    kubernetes.io/arch
    beta.kubernetes.io/os
    beta.kubernetes.io/arch
    
    或label域或子域在列表中:
    kubelet.kubernetes.io
    node.kubernetes.io

go

			// validate the initial KubeletFlags
			if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
				klog.Fatal(err)
			}

			if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
				klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
			}

cmd\kubelet\app\options\options.go

go

// ValidateKubeletFlags validates Kubelet's configuration flags and returns an error if they are invalid.
func ValidateKubeletFlags(f *KubeletFlags) error {
	// ensure that nobody sets DynamicConfigDir if the dynamic config feature gate is turned off
	if f.DynamicConfigDir.Provided() && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
		return fmt.Errorf("the DynamicKubeletConfig feature gate must be enabled in order to use the --dynamic-config-dir flag")
	}
	if f.NodeStatusMaxImages < -1 {
		return fmt.Errorf("invalid configuration: NodeStatusMaxImages (--node-status-max-images) must be -1 or greater")
	}

	unknownLabels := sets.NewString()
	for k := range f.NodeLabels {
		if isKubernetesLabel(k) && !kubeletapis.IsKubeletLabel(k) {
			unknownLabels.Insert(k)
		}
	}
	if len(unknownLabels) > 0 {
		return fmt.Errorf("unknown 'kubernetes.io' or 'k8s.io' labels specified with --node-labels: %v\n--node-labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", unknownLabels.List(), strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", "))
	}

	return nil
}

这个后面一篇文章讲解

go

// use dynamic kubelet config, if enabled
			var kubeletConfigController *dynamickubeletconfig.Controller
			if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
				var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
				dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
					func(kc *kubeletconfiginternal.KubeletConfiguration) error {
						// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
						// so that we get a complete validation at the same point where we can decide to reject dynamic config.
						// This fixes the flag-precedence component of issue #63305.
						// See issue #56171 for general details on flag precedence.
						return kubeletConfigFlagPrecedence(kc, args)
					})
				if err != nil {
					klog.Fatal(err)
				}
				// If we should just use our existing, local config, the controller will return a nil config
				if dynamicKubeletConfig != nil {
					kubeletConfig = dynamicKubeletConfig
					// Note: flag precedence was already enforced in the controller, prior to validation,
					// by our above transform function. Now we simply update feature gates from the new config.
					if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
						klog.Fatal(err)
					}
				}
			}

kubeletSever可以认为是kubelet的配置合集

go

			// construct a KubeletServer from kubeletFlags and kubeletConfig
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

Dependencies是kubelet运行需要的一些其他依赖组件的合集

go

			// use kubeletServer to construct the default KubeletDeps
			kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
			if err != nil {
				klog.Fatal(err)
			}
    		// add the kubelet config controller to kubeletDeps
			kubeletDeps.KubeletConfigController = kubeletConfigController

unsecuredDependencies主要初始化这些组件:

  • 初始化kubelet 监听端口的tls证书配置
  • 挂载器mounter
  • 主机操作器HostUtil–用来查找挂载、设置selinux等操作
  • 文件相关操作器Subpather
  • 动态插件探测器DynamicPluginProber–加载Flexvolume插件
  • 操作系统相关操作OSInterface
  • docker客户端配置–这里指的是dockershim连接docker
  • oom调整器OOMAdjuster
  • volume插件列表

volume插件列表:

  • 集成在kubelet代码中老的volume,正在被迁移到外部独立csi provider
    • aws EBS
    • gce PD
    • Cinder
    • azure disk
    • azure file
    • vsphere volume
  • emptydir
  • git repo
  • host path
  • nfs
  • secret
  • iscsi
  • glusterfs
  • rbd
  • quobyte
  • cephfs
  • downwardapi
  • fc
  • flocker
  • configmap
  • projected
  • portworx
  • scaleio
  • local
  • storageos
  • csi

在没有启用servertlsbootstrap,且没有配置tlsCerFile和TLSPrivateKeyFile,且CertDirectory目录下没有kubelet.crt和kubelet.key文件。自动在CertDirectory(默认在/va/run/kubernetes)下生成自签名的证书kubelet.crt和kubelet.key。

检验kubelet配置的TLSCipherSuites是否支持

检验kubelet配置的TLSMinVersion是否支持

如果配置了客户端认证ca证书,则设置ca相关参数

cmd\kubelet\app\server.go

go

// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
	if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
		kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
		kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")

		canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
		if err != nil {
			return nil, err
		}
		if !canReadCertAndKey {
			hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
			if err != nil {
				return nil, err
			}
			cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
			if err != nil {
				return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
			}

			if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
				return nil, err
			}

			if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
				return nil, err
			}

			klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
		}
	}

	tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
	if err != nil {
		return nil, err
	}

	minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
	if err != nil {
		return nil, err
	}

	tlsOptions := &server.TLSOptions{
		Config: &tls.Config{
			MinVersion:   minTLSVersion,
			CipherSuites: tlsCipherSuites,
		},
		CertFile: kc.TLSCertFile,
		KeyFile:  kc.TLSPrivateKeyFile,
	}

	if len(kc.Authentication.X509.ClientCAFile) > 0 {
		clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
		if err != nil {
			return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
		}
		// Specify allowed CAs for client certificates
		tlsOptions.Config.ClientCAs = clientCAs
		// Populate PeerCertificates in requests, but don't reject connections without verified certificates
		tlsOptions.Config.ClientAuth = tls.RequestClientCert
	}

	return tlsOptions, nil
}

这里会处理两种信号SIGTERM and SIGINT,当收到这两种信号时关闭stopCh

go

			// set up stopCh here in order to be reused by kubelet and docker shim
			stopCh := genericapiserver.SetupSignalHandler()

staging\src\k8s.io\apiserver\pkg\server\signal.go

go

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
	close(onlyOneSignalHandler) // panics when called twice

	shutdownHandler = make(chan os.Signal, 2)

	stop := make(chan struct{})
	signal.Notify(shutdownHandler, shutdownSignals...)
	go func() {
		<-shutdownHandler
		close(stop)
		<-shutdownHandler
		os.Exit(1) // second signal. Exit directly.
	}()

	return stop
}

当命令行参数设置了–experimental-dockershim,则只运行dockershim,这个模式只用来测试。

go

// start the experimental docker shim, if enabled
			if kubeletServer.KubeletFlags.ExperimentalDockershim {
				if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
					klog.Fatal(err)
				}
				return
			}

上面初始化之后开始真正运行kubelet,后面会专门分析。

go

			// run the kubelet
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
			if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
				klog.Fatal(err)
			}

相关内容