Kubelet Startup - Command Execution

In the previous analysis, we explored the initialization of kubelet’s command-line. Now, we will delve into the execution of kubelet’s Cobra Command. The Execute() function in Cobra essentially involves command-line parsing (unless DisableFlagParsing: true is set), followed by passing the remaining arguments to the Command.Run function for execution.

This analysis is based on Kubernetes version 1.18.6. For the source code and related readings, please visit the source code analysis repository.

Due to the setting of DisableFlagParsing: true, the args parameter in the Run function receives the user-input command (i.e., os.Args[1:]).

In cmd/kubelet/app/server.go#L148-L152:

go

		// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
		// so we do all our parsing manually in Run, below.
		// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
		// `args` arg to Run, without Cobra's interference.
		DisableFlagParsing: true,

The command-line parsing is performed on the cleanFlagSet, which sets the values of kubeletFlags and kubeletConfig based on command-line arguments. After parsing, it checks if there are any non-flag arguments in the command line, and if so, it exits since kubelet doesn’t have subcommands.

go

	Run: func(cmd *cobra.Command, args []string) {
			// initial flag parse, since we disable cobra's flag parsing
			if err := cleanFlagSet.Parse(args); err != nil {
				cmd.Usage()
				klog.Fatal(err)
			}

			// check if there are non-flag arguments in the command line
			cmds := cleanFlagSet.Args()
			if len(cmds) > 0 {
				cmd.Usage()
				klog.Fatalf("unknown command: %s", cmds[0])
			}

If the --help flag is detected, it simply executes cmd.Help() to print the help information.

go

			// short-circuit on help
			help, err := cleanFlagSet.GetBool("help")
			if err != nil {
				klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
			}
			if help {
				cmd.Help()
				return
			}

To print the current version, it calls verflag.PrintAndExitIfRequested().

go

			// short-circuit on verflag
			// 访问 --version
			verflag.PrintAndExitIfRequested()

It uses utilflag.PrintFlags(cleanFlagSet) to log all the flags in the flagset.

go

utilflag.PrintFlags(cleanFlagSet)

pkg\util\flag\flags.go

go

// PrintFlags logs the flags in the flagset
func PrintFlags(flags *pflag.FlagSet) {
	flags.VisitAll(func(flag *pflag.Flag) {
		klog.V(1).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
	})
}

The flags are logged in the kubelet.INFO logs.

text

I1127 22:24:29.104931  788647 flags.go:59] FLAG: --add-dir-header="false"
I1127 22:24:29.114617  788647 flags.go:59] FLAG: --address="10.19.0.11"
I1127 22:24:29.114630  788647 flags.go:59] FLAG: --allowed-unsafe-sysctls="[]"
I1127 22:24:29.114636  788647 flags.go:59] FLAG: --alsologtostderr="false"
I1127 22:24:29.114640  788647 flags.go:59] FLAG: --anonymous-auth="false"

It sets the feature gates from the initial flags-based configuration into DefaultMutableFeatureGate.

The specific initialization process is detailed in this article.

go

// set feature gates from initial flags-based config
			if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
				klog.Fatal(err)
			}
  • If --dynamic-config-dir is configured, DynamicKubeletConfig featuregate must be enabled.

  • --node-status-max-images must be greater than or equal to -1.

  • --node-labels should consist of regular labels (those without the kubernetes.io prefix) or labels that meet specific criteria as outlined in the code.

    text

    #label in list:
    kubernetes.io/hostname 
    topology.kubernetes.io/zone
    topology.kubernetes.io/region
    failure-domain.beta.kubernetes.io/zone
    failure-domain.beta.kubernetes.io/region
    beta.kubernetes.io/instance-type
    node.kubernetes.io/instance-type
    kubernetes.io/os
    kubernetes.io/arch
    beta.kubernetes.io/os
    beta.kubernetes.io/arch
    
    #or label in domain or subdomain:
    kubelet.kubernetes.io
    node.kubernetes.io

go

			// validate the initial KubeletFlags
			if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
				klog.Fatal(err)
			}

			if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
				klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
			}

cmd\kubelet\app\options\options.go

go

// ValidateKubeletFlags validates Kubelet's configuration flags and returns an error if they are invalid.
func ValidateKubeletFlags(f *KubeletFlags) error {
	// ensure that nobody sets DynamicConfigDir if the dynamic config feature gate is turned off
	if f.DynamicConfigDir.Provided() && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
		return fmt.Errorf("the DynamicKubeletConfig feature gate must be enabled in order to use the --dynamic-config-dir flag")
	}
	if f.NodeStatusMaxImages < -1 {
		return fmt.Errorf("invalid configuration: NodeStatusMaxImages (--node-status-max-images) must be -1 or greater")
	}

	unknownLabels := sets.NewString()
	for k := range f.NodeLabels {
		if isKubernetesLabel(k) && !kubeletapis.IsKubeletLabel(k) {
			unknownLabels.Insert(k)
		}
	}
	if len(unknownLabels) > 0 {
		return fmt.Errorf("unknown 'kubernetes.io' or 'k8s.io' labels specified with --node-labels: %v\n--node-labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", unknownLabels.List(), strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", "))
	}

	return nil
}

这个后面一篇文章讲解

go

// use dynamic kubelet config, if enabled
			var kubeletConfigController *dynamickubeletconfig.Controller
			if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
				var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
				dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
					func(kc *kubeletconfiginternal.KubeletConfiguration) error {
						// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
						// so that we get a complete validation at the same point where we can decide to reject dynamic config.
						// This fixes the flag-precedence component of issue #63305.
						// See issue #56171 for general details on flag precedence.
						return kubeletConfigFlagPrecedence(kc, args)
					})
				if err != nil {
					klog.Fatal(err)
				}
				// If we should just use our existing, local config, the controller will return a nil config
				if dynamicKubeletConfig != nil {
					kubeletConfig = dynamicKubeletConfig
					// Note: flag precedence was already enforced in the controller, prior to validation,
					// by our above transform function. Now we simply update feature gates from the new config.
					if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
						klog.Fatal(err)
					}
				}
			}

The KubeletServer can be thought of as a collection of configurations for kubelet.

go

			// construct a KubeletServer from kubeletFlags and kubeletConfig
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

Dependencies is a collection of various dependencies that kubelet needs to run.

go

			// use kubeletServer to construct the default KubeletDeps
			kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
			if err != nil {
				klog.Fatal(err)
			}
    		// add the kubelet config controller to kubeletDeps
			kubeletDeps.KubeletConfigController = kubeletConfigController

UnsecuredDependencies is mainly responsible for initializing the following components:

  • Initialization of TLS certificate configuration for kubelet’s listening port.
  • Mounter.
  • HostUtil - Used for tasks like finding mounts and setting SELinux configurations.
  • Subpather - File-related operations.
  • DynamicPluginProber - Loads Flexvolume plugins.
  • OSInterface - Operating system-related operations.
  • Docker client configuration - This refers to the dockershim for connecting to Docker.
  • OOMAdjuster.
  • List of volume plugins.

volume plugins:

  • The list of volume plugins includes legacy volume plugins integrated into the kubelet codebase. However, they are currently being migrated to external standalone CSI providers. These legacy volume plugins include:
    • aws EBS
    • gce PD
    • Cinder
    • azure disk
    • azure file
    • vsphere volume
  • emptydir
  • git repo
  • host path
  • nfs
  • secret
  • iscsi
  • glusterfs
  • rbd
  • quobyte
  • cephfs
  • downwardapi
  • fc
  • flocker
  • configmap
  • projected
  • portworx
  • scaleio
  • local
  • storageos
  • csi

The InitializeTLS function is responsible for initializing the TLS certificate configuration. Here are the key steps involved:

  1. Checking Server TLS Bootstrap: It checks if server TLS bootstrap (--server-tls-bootstrap) is enabled. If enabled, the server TLS certificate is automatically bootstrapped, and no further actions are taken.
  2. Checking Certificate and Key Files: It checks if the tlsCertFile and tlsPrivateKeyFile options are configured in the kubelet’s configuration. If these options are not set, it proceeds to generate a self-signed certificate if the certDirectory (default: /var/run/kubernetes) does not already contain kubelet.crt and kubelet.key files.
  3. Validating TLS Cipher Suites: It validates whether the TLS cipher suites specified in the kubelet’s configuration (TLSCipherSuites) are supported.
  4. Validating TLS Minimum Version: It checks whether the TLS minimum version specified in the kubelet’s configuration (TLSMinVersion) is supported.
  5. Client Authentication CA: If the kubelet configuration includes client authentication using a CA certificate (client-ca-file), it sets up the necessary CA-related parameters for client authentication.

This process ensures that the kubelet has the necessary TLS configuration for secure communication, and it generates or validates the required TLS certificates and keys.

This functionality is defined in cmd\kubelet\app\server.go in the Kubernetes source code.

go

// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
	if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
		kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
		kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")

		canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
		if err != nil {
			return nil, err
		}
		if !canReadCertAndKey {
			hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
			if err != nil {
				return nil, err
			}
			cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
			if err != nil {
				return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
			}

			if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
				return nil, err
			}

			if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
				return nil, err
			}

			klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
		}
	}

	tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
	if err != nil {
		return nil, err
	}

	minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
	if err != nil {
		return nil, err
	}

	tlsOptions := &server.TLSOptions{
		Config: &tls.Config{
			MinVersion:   minTLSVersion,
			CipherSuites: tlsCipherSuites,
		},
		CertFile: kc.TLSCertFile,
		KeyFile:  kc.TLSPrivateKeyFile,
	}

	if len(kc.Authentication.X509.ClientCAFile) > 0 {
		clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
		if err != nil {
			return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
		}
		// Specify allowed CAs for client certificates
		tlsOptions.Config.ClientCAs = clientCAs
		// Populate PeerCertificates in requests, but don't reject connections without verified certificates
		tlsOptions.Config.ClientAuth = tls.RequestClientCert
	}

	return tlsOptions, nil
}

Signal Handler Setup: The code sets up a signal handler for SIGTERM and SIGINT signals using the genericapiserver.SetupSignalHandler() function. It returns a channel (stopCh) that is used to handle the termination signals.

go

			// set up stopCh here in order to be reused by kubelet and docker shim
			stopCh := genericapiserver.SetupSignalHandler()

staging\src\k8s.io\apiserver\pkg\server\signal.go

go

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
	close(onlyOneSignalHandler) // panics when called twice

	shutdownHandler = make(chan os.Signal, 2)

	stop := make(chan struct{})
	signal.Notify(shutdownHandler, shutdownSignals...)
	go func() {
		<-shutdownHandler
		close(stop)
		<-shutdownHandler
		os.Exit(1) // second signal. Exit directly.
	}()

	return stop
}

If the --experimental-dockershim flag is set in the command line parameters, it indicates that only the DockerShim mode should be run. This mode is typically used for testing purposes.

go

// start the experimental docker shim, if enabled
			if kubeletServer.KubeletFlags.ExperimentalDockershim {
				if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
					klog.Fatal(err)
				}
				return
			}

The Run function is called to start the kubelet with the specified kubeletServer, kubeletDeps, and other configurations.

go

			// run the kubelet
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
			if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
				klog.Fatal(err)
			}

Related Content