kubelet启动--日志初始化

kubernetes 默认使用的日志库是klog,kubernetes的k8s.io/component-base/logs库,专门做日志初始化相关操作。

klog是glog的fork版本,由于glog不在开发、在容器中运行有一系列问题、不容易测试等问题,所以kubenetes自己维护了一个klog。

在1.18版本中使用v1.0.0版本的klog,这个版本的--add_dir_header不生效,在https://github.com/kubernetes/klog/pull/101修复了,但是在1.18中一直存在这个问题,小版本更新一直没有更新klog版本。

本文基于kubernetes 1.18.6版本,请访问源代码阅读仓库

kubelet在main函数中直接调用k8s.io/component-base/logs的InitLogs()初始化日志,并在程序退出时候执行flushlog。

cmd\kubelet\kubelet.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import (
	"math/rand"
	"os"
	"time"

	"k8s.io/component-base/logs"
	_ "k8s.io/component-base/metrics/prometheus/restclient"
	_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
	"k8s.io/kubernetes/cmd/kubelet/app"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewKubeletCommand()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

这个模块还会加载klog模块,而klog模块也有init()。

全局变量logFlushFreq–注册命令行选项--log-flush-frequency,init()–初始化klog。

1
2
3
4
5
6
7
const logFlushFreqFlagName = "log-flush-frequency"

var logFlushFreq = pflag.Duration(logFlushFreqFlagName, 5*time.Second, "Maximum number of seconds between log flushes")

func init() {
	klog.InitFlags(flag.CommandLine)
}

设置命令行选项的默认值和goroutine来周期性刷写日志。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// init sets up the defaults and runs flushDaemon.
func init() {
	logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
	logging.setVState(0, nil, false)
	logging.logDir = ""
	logging.logFile = ""
	logging.logFileMaxSizeMB = 1800
	logging.toStderr = true
	logging.alsoToStderr = false
	logging.skipHeaders = false
	logging.addDirHeader = false
	logging.skipLogHeaders = false
	go logging.flushDaemon()
}

flushDaemon()–每隔5秒执行lockAndFlushAll()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const flushInterval = 5 * time.Second

// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
	for range time.NewTicker(flushInterval).C {
		l.lockAndFlushAll()
	}
}

// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
	l.mu.Lock()
	l.flushAll()
	l.mu.Unlock()
}

// flushAll flushes all the logs and attempts to "sync" their data to disk.
// l.mu is held.
func (l *loggingT) flushAll() {
	// Flush from fatal down, in case there's trouble flushing.
	for s := fatalLog; s >= infoLog; s-- {
		file := l.file[s]
		if file != nil {
			file.Flush() // ignore error
			file.Sync()  // ignore error
		}
	}
}

注册klog命令行选项

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// InitFlags is for explicitly initializing the flags.
func InitFlags(flagset *flag.FlagSet) {
	if flagset == nil {
		flagset = flag.CommandLine
	}

	flagset.StringVar(&logging.logDir, "log_dir", logging.logDir, "If non-empty, write log files in this directory")
	flagset.StringVar(&logging.logFile, "log_file", logging.logFile, "If non-empty, use this log file")
	flagset.Uint64Var(&logging.logFileMaxSizeMB, "log_file_max_size", logging.logFileMaxSizeMB,
		"Defines the maximum size a log file can grow to. Unit is megabytes. "+
			"If the value is 0, the maximum file size is unlimited.")
	flagset.BoolVar(&logging.toStderr, "logtostderr", logging.toStderr, "log to standard error instead of files")
	flagset.BoolVar(&logging.alsoToStderr, "alsologtostderr", logging.alsoToStderr, "log to standard error as well as files")
	flagset.Var(&logging.verbosity, "v", "number for the log level verbosity")
	flagset.BoolVar(&logging.skipHeaders, "add_dir_header", logging.addDirHeader, "If true, adds the file directory to the header")
	flagset.BoolVar(&logging.skipHeaders, "skip_headers", logging.skipHeaders, "If true, avoid header prefixes in the log messages")
	flagset.BoolVar(&logging.skipLogHeaders, "skip_log_headers", logging.skipLogHeaders, "If true, avoid headers when opening log files")
	flagset.Var(&logging.stderrThreshold, "stderrthreshold", "logs at or above this threshold go to stderr")
	flagset.Var(&logging.vmodule, "vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
	flagset.Var(&logging.traceLocation, "log_backtrace_at", "when logging hits line file:N, emit a stack trace")
}

兼容系统log库–如果使用系统的log库来写日志,最终还是使用klog来记录。然后设置log库将原始的日志传给klog,不做任何处理。

比如使用log.Print来记录日志,它最终会调用KlogWriter的Write方法来记录。

为什么要兼容系统log库记录?

可能是历史原因一些使用外部库的日志使用的是系统的log库。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Write implements the io.Writer interface.
func (writer KlogWriter) Write(data []byte) (n int, err error) {
	klog.InfoDepth(1, string(data))
	return len(data), nil
}

// InitLogs initializes logs the way we want for kubernetes.
func InitLogs() {
	// 兼容使用log来记录,实际还是使用klog
	log.SetOutput(KlogWriter{})
	// 设置log输出格式为空,日志格式由klog决定
	log.SetFlags(0)
	// The default glog flush interval is 5 seconds.
	go wait.Forever(klog.Flush, *logFlushFreq)
}

klog.Flush()–执行日志的刷写,这里与上面的klog init起的goroutine执行一样的方法lockAndFlushAll(),kubelet起了两个goroutine来刷写日志。疑惑?

1
2
3
4
// Flush flushes all pending log I/O.
func Flush() {
	logging.lockAndFlushAll()
}

执行到addKlogFlags(fs)时候,再次执行klog.InitFlag。

为什么再次执行klog.InitFlag?

仔细会发现再次执行时候的参数是一个新建的flagset–flag.NewFlagSet(os.Args[0], flag.ExitOnError)(不是flag.CommandLine),也就是说在这个flagset中重复注册命令行选项,然后保存到kubelet的pflagset中,而flag.CommandLine的放者不用,这个极大浪费。

从commit提交信息发现,addKlogFlags是klog升级到0.3.0版本添加的–相关pull request,我猜测是klog升级到1.0.0版本,但是kubelet代码没有做相应变化。

cmd\kubelet\app\options\globalflags.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// AddGlobalFlags explicitly registers flags that libraries (glog, verflag, etc.) register
// against the global flagsets from "flag" and "github.com/spf13/pflag".
// We do this in order to prevent unwanted flags from leaking into the Kubelet's flagset.
func AddGlobalFlags(fs *pflag.FlagSet) {
	addKlogFlags(fs)
	addCadvisorFlags(fs)
	addCredentialProviderFlags(fs)
	verflag.AddFlags(fs)
	logs.AddFlags(fs)
}

// addKlogFlags adds flags from k8s.io/klog
func addKlogFlags(fs *pflag.FlagSet) {
	local := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
	klog.InitFlags(local)
	fs.AddGoFlagSet(local)
}

程序退出flush日志,保证日志不会丢失。

1
2
3
4
// FlushLogs flushes logs immediately.
func FlushLogs() {
	klog.Flush()
}

感觉日志这块一直在演进,难免会存在历史账,但是最终是work的😂。在kubernetes 1.20版本中, klog升级到2.4.0版本,更好的支持结构化的日志。

这里没有分析klog源码,后面有时间再分析klog。kubernetes真的太大了,能不能在它衰退前看完,时间真不够。

相关内容