前面已经分析了kubelet启动命令行初始化,接下来介绍kubelet的cobra Command执行。cobra的Excute()–简单理解就是先进行命令行解析(如果设置了DisableFlagParsing: true
则不解析),再将命令解析完剩下参数传给Command.Run进行执行。
本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库。
由于设置DisableFlagParsing: true
所以Run的参数args是用户输入的命令行(os.Args[1:]),即Run(Command, os.Args[1:])。
cmd\kubelet\app\server.go#L148-L152
1
2
3
4
5
|
// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
// so we do all our parsing manually in Run, below.
// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
// `args` arg to Run, without Cobra's interference.
DisableFlagParsing: true,
|
命令行定义在cleanFlagSet,进行命令解析,设置kubeletFlags和kubeletConfig相应字段的值。
命令行解析完之后,判断是命令行否有非flag参数,有的话就退出–kubelet是没有子命令的。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
Run: func(cmd *cobra.Command, args []string) {
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
klog.Fatal(err)
}
// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
cmd.Usage()
klog.Fatalf("unknown command: %s", cmds[0])
}
|
如果是命令行帮助的化,直接执行cmd.Help()打印帮助信息。
1
2
3
4
5
6
7
8
9
|
// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
cmd.Help()
return
}
|
打印当前版本,具体版本相关处理过程请看
1
2
3
|
// short-circuit on verflag
// 访问 --version
verflag.PrintAndExitIfRequested()
|
1
|
utilflag.PrintFlags(cleanFlagSet)
|
pkg\util\flag\flags.go
1
2
3
4
5
6
|
// PrintFlags logs the flags in the flagset
func PrintFlags(flags *pflag.FlagSet) {
flags.VisitAll(func(flag *pflag.Flag) {
klog.V(1).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
})
}
|
在kubelet.INFO里看到这样的日志。
1
2
3
4
5
|
I1127 22:24:29.104931 788647 flags.go:59] FLAG: --add-dir-header="false"
I1127 22:24:29.114617 788647 flags.go:59] FLAG: --address="10.19.0.11"
I1127 22:24:29.114630 788647 flags.go:59] FLAG: --allowed-unsafe-sysctls="[]"
I1127 22:24:29.114636 788647 flags.go:59] FLAG: --alsologtostderr="false"
I1127 22:24:29.114640 788647 flags.go:59] FLAG: --anonymous-auth="false"
|
设置kubeletConfig中的featureGates加入到DefaultMutableFeatureGate
具体初始化过程,请查看这篇文章
1
2
3
4
|
// set feature gates from initial flags-based config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
|
-
配置了-- dynamic-config-dir
则必须启用了DynamicKubeletConfig featuregate
-
--node-status-max-images
必须大于等于-1
-
--node-labels
应该是普通label–非``kubernetes.io`前缀的label,或是满足下面条件label
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
label在下面列表中:
kubernetes.io/hostname
topology.kubernetes.io/zone
topology.kubernetes.io/region
failure-domain.beta.kubernetes.io/zone
failure-domain.beta.kubernetes.io/region
beta.kubernetes.io/instance-type
node.kubernetes.io/instance-type
kubernetes.io/os
kubernetes.io/arch
beta.kubernetes.io/os
beta.kubernetes.io/arch
或label域或子域在列表中:
kubelet.kubernetes.io
node.kubernetes.io
|
1
2
3
4
5
6
7
8
|
// validate the initial KubeletFlags
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
klog.Fatal(err)
}
if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}
|
cmd\kubelet\app\options\options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// ValidateKubeletFlags validates Kubelet's configuration flags and returns an error if they are invalid.
func ValidateKubeletFlags(f *KubeletFlags) error {
// ensure that nobody sets DynamicConfigDir if the dynamic config feature gate is turned off
if f.DynamicConfigDir.Provided() && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
return fmt.Errorf("the DynamicKubeletConfig feature gate must be enabled in order to use the --dynamic-config-dir flag")
}
if f.NodeStatusMaxImages < -1 {
return fmt.Errorf("invalid configuration: NodeStatusMaxImages (--node-status-max-images) must be -1 or greater")
}
unknownLabels := sets.NewString()
for k := range f.NodeLabels {
if isKubernetesLabel(k) && !kubeletapis.IsKubeletLabel(k) {
unknownLabels.Insert(k)
}
}
if len(unknownLabels) > 0 {
return fmt.Errorf("unknown 'kubernetes.io' or 'k8s.io' labels specified with --node-labels: %v\n--node-labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", unknownLabels.List(), strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", "))
}
return nil
}
|
这个后面一篇文章讲解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
|
kubeletSever可以认为是kubelet的配置合集
1
2
3
4
5
|
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
|
Dependencies是kubelet运行需要的一些其他依赖组件的合集
1
2
3
4
5
6
7
|
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.Fatal(err)
}
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController
|
unsecuredDependencies主要初始化这些组件:
- 初始化kubelet 监听端口的tls证书配置
- 挂载器mounter
- 主机操作器HostUtil–用来查找挂载、设置selinux等操作
- 文件相关操作器Subpather
- 动态插件探测器DynamicPluginProber–加载Flexvolume插件
- 操作系统相关操作OSInterface
- docker客户端配置–这里指的是dockershim连接docker
- oom调整器OOMAdjuster
- volume插件列表
volume插件列表:
- 集成在kubelet代码中老的volume,正在被迁移到外部独立csi provider
- aws EBS
- gce PD
- Cinder
- azure disk
- azure file
- vsphere volume
- emptydir
- git repo
- host path
- nfs
- secret
- iscsi
- glusterfs
- rbd
- quobyte
- cephfs
- downwardapi
- fc
- flocker
- configmap
- projected
- portworx
- scaleio
- local
- storageos
- csi
在没有启用servertlsbootstrap,且没有配置tlsCerFile和TLSPrivateKeyFile,且CertDirectory目录下没有kubelet.crt和kubelet.key文件。自动在CertDirectory(默认在/va/run/kubernetes)下生成自签名的证书kubelet.crt和kubelet.key。
检验kubelet配置的TLSCipherSuites是否支持
检验kubelet配置的TLSMinVersion是否支持
如果配置了客户端认证ca证书,则设置ca相关参数
cmd\kubelet\app\server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
if err != nil {
return nil, err
}
if !canReadCertAndKey {
hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
if err != nil {
return nil, err
}
cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
if err != nil {
return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
}
if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
return nil, err
}
if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
return nil, err
}
klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
}
}
tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
if err != nil {
return nil, err
}
minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
if err != nil {
return nil, err
}
tlsOptions := &server.TLSOptions{
Config: &tls.Config{
MinVersion: minTLSVersion,
CipherSuites: tlsCipherSuites,
},
CertFile: kc.TLSCertFile,
KeyFile: kc.TLSPrivateKeyFile,
}
if len(kc.Authentication.X509.ClientCAFile) > 0 {
clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
if err != nil {
return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
}
// Specify allowed CAs for client certificates
tlsOptions.Config.ClientCAs = clientCAs
// Populate PeerCertificates in requests, but don't reject connections without verified certificates
tlsOptions.Config.ClientAuth = tls.RequestClientCert
}
return tlsOptions, nil
}
|
这里会处理两种信号SIGTERM and SIGINT,当收到这两种信号时关闭stopCh
1
2
|
// set up stopCh here in order to be reused by kubelet and docker shim
stopCh := genericapiserver.SetupSignalHandler()
|
staging\src\k8s.io\apiserver\pkg\server\signal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2)
stop := make(chan struct{})
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
close(stop)
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()
return stop
}
|
当命令行参数设置了–experimental-dockershim,则只运行dockershim,这个模式只用来测试。
1
2
3
4
5
6
7
|
// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}
|
上面初始化之后开始真正运行kubelet,后面会专门分析。
1
2
3
4
5
|
// run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
klog.Fatal(err)
}
|