云计算K8s组件系列(四)

您所在的位置:网站首页 kubeletconfig指定位置的配置文件 云计算K8s组件系列(四)

云计算K8s组件系列(四)

2024-07-09 21:49| 来源: 网络整理| 查看: 265

kubelet用于处理master节点下发到本节点的任务,管理Pod以及Pod中的容器。每个kubelet进程会在API Server上注册节点信息,定期向master节点汇报节点资源的使用情况,并通过cAdvisor监控容器和节点的资源。

kubelet

kubelet 这个组件本身,也是 Kubernetes 里面第二个不可被替代的组件(第一个不可被替代的组件当然是 kube-apiserver)。也就是说,无论如何,我都不太建议你对 kubelet 的代码进行大量的改动。保持 kubelet 跟上游基本一致的重要性,就跟保持 kube-apiserver 跟上游一致是一个道理。

kubelet 本身,也是按照“控制器”模式来工作的。

kubelet 的工作核心,就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括四种:

Pod 更新事件; Pod 生命周期变化; kubelet 本身设置的执行周期; 定时的清理事件。

跟其他控制器类似,kubelet 启动的时候,要做的第一件事情,就是设置 Listers,也就是注册它所关心的各种事件的 Informer。这些 Informer,就是 SyncLoop 需要处理的数据的来源。此外,kubelet 还负责维护着很多很多其他的子控制循环(也就是图中的小圆圈)。这些控制循环的名字,一般被称作某某 Manager,比如 Volume Manager、Image Manager、Node Status Manager 等等。不难想到,这些控制循环的责任,就是通过控制器模式,完成 kubelet 的某项具体职责。比如 Node Status Manager,就负责响应 Node 的状态变化,然后将 Node 的状态收集起来,并通过 Heartbeat 的方式上报给 APIServer。再比如 CPU Manager,就负责维护该 Node 的 CPU 核的信息,以便在 Pod 通过 cpuset 的方式请求 CPU 核的时候,能够正确地管理 CPU 核的使用量和可用量。

SyncLoop,又是如何根据 Pod 对象的变化,来进行容器操作的呢?

kubelet 也是通过 Watch 机制,监听了与自己相关的 Pod 对象的变化。当然,这个 Watch 的过滤条件是该 Pod 的 nodeName 字段与自己相同。kubelet 会把这些 Pod 的信息缓存在自己的内存里。而当一个 Pod 完成调度、与一个 Node 绑定起来之后, 这个 Pod 的变化就会触发 kubelet 在控制循环里注册的 Handler,也就是上图中的 HandlePods 部分。此时,通过检查该 Pod 在 kubelet 内存里的状态,kubelet 就能够判断出这是一个新调度过来的 Pod,从而触发 Handler 里 ADD 事件对应的处理逻辑。在具体的处理过程当中,kubelet 会启动一个名叫 Pod Update Worker 的、单独的 Goroutine 来完成对 Pod 的处理工作。

比如,如果是 ADD 事件的话,kubelet 就会为这个新的 Pod 生成对应的 Pod Status,检查 Pod 所声明使用的 Volume 是不是已经准备好。然后,调用下层的容器运行时(比如 Docker),开始创建这个 Pod 所定义的容器。而如果是 UPDATE 事件的话,kubelet 就会根据 Pod 对象具体的变更情况,调用下层容器运行时进行容器的重建工作。

kubelet 调用下层容器运行时的执行过程,并不会直接调用 Docker 的 API,而是通过一组叫作 CRI(Container Runtime Interface,容器运行时接口)的 gRPC 接口来间接执行的。Kubernetes 项目之所以要在 kubelet 中引入这样一层单独的抽象,当然是为了对 Kubernetes 屏蔽下层容器运行时的差异。实际上,对于 1.6 版本之前的 Kubernetes 来说,它就是直接调用 Docker 的 API 来创建和管理容器的。

把 kubelet 对容器的操作,统一地抽象成一个接口CRI。这样,kubelet 就只需要跟这个接口打交道了。而作为具体的容器项目,比如 Docker、 rkt、runV,它们就只需要自己提供一个该接口的实现,然后对 kubelet 暴露出 gRPC 服务即可。

目前kubelet还集成这docker的负责响应的组件dockershim,而其他容器化CRI shim就需要自己实现部署了,所以可见dockershim被移除是必然的趋势,k8s只会提供一套统一的接口cri。

节点管理

节点通过设置kubelet的启动参数“–register-node”来决定是否向API Server注册自己。如果该参数为true,那么kubelet将试着通过API Server注册自己。在自注册时,kubelet启动时还包括以下参数:

--api-servers:API Server的位置 --kubeconfing:kubeconfig文件,用于访问API Server的安全配置文件 --cloud-provider:云服务商地址,仅用于共有云环境

如果没有选择自注册模式,用户需要手动去配置node的资源信息,同时告知ndoe上的kubelet API Server的位置。Kubelet在启动时通过API Server注册节点信息,并定时向API Server发送节点新消息,API Server在接受到这些消息之后,将这些信息写入etcd中。通过kubelet的启动参数“–node-status-update-frequency”设置kubelet每个多长时间向API Server报告节点状态,默认为10s

pod管理 资源获取

kubelet通过以下几种方式获取自身node上所要运行的pod清单:

文件:kubelet启动参数“–config”指定的配置文件目录下的文件(默认为“/etc/Kubernetes/manifests”)通过–file-check-frequency设置检查该文件的时间间隔,默认为20s HTTP端点:通过“–manifest-url”参数设置。通过“–http-check-frequency”设置检查该HTTP端点数据的时间间隔,默认为20s。 API Server:kubelet通过API server监听etcd目录,同步pod列表

注意:这里static pod,不是被API Server创建的,而是被kubelet创建,之前文章中提到了静态的pod是在kubelet的配置文件中编写,并且总在kubelet所在node上运行。    

创建流程

Kubelet监听etcd,所有针对pod的操作将会被kubelet监听到。如果是新的绑定到本节点的pod,则按照pod清单的要求创建pod,如果是删除pod,则kubelet通过docker client去删除pod中的容器,并删除该pod。

具体的针对创建和修改pod任务,流程为:

为该pod创建一个目录 从API Server读取该pod清单 为该pod挂载外部volume 下载pod用到的secret 检查已经运行在节点中的pod,如果该pod没有容器或者Pause容器没有启动,则先停止pod里的所有容器的进程。如果pod中有需要删除的容器,则删除这些容器 为pod中的每个容器做如下操作 为容器计算一个hash值,然后用容器的名字去查询docker容器的hash值。若查找到容器,且两者得到hash不同,则停止docker中的容器的进程,并且停止与之关联pause容器的进程;若两个相同,则不做任何处理 如果容器被停止了,且容器没有指定restartPolicy(重启策略),则不做任何处理 调用docker client 下载容器镜像,调用docker client 运行容器 容器的健康检查

Pod通过两类探针来检查容器的健康状态。一个是livenessProbe探针,用于判断容器是否健康,告诉kubelet一个容器什么时候处于不健康状态,如果livenessProbe探针探测到容器不健康,则kubelet将删除该容器,并根据容器的重启策略做相应的处理;如果一个容器不包含livenessProbe探针,那么kubelet认为livenessProbe探针的返回值永远为“success”。

另一个探针为ReadinessProbe,用于判断容器是否启动完成,且准备接受请求。如果ReadinessProbe探针检测到失败,则pod的状态将被修改,endpoint controller将从service的endpoints中删除包含该容器所在pod的IP地址的endpoint条目。

cadvisor资源监控

cadcisor是为容器监控而生的监控工具,目前集成在kubelet中,以4194端口进行暴露。

源码解析

目前是基于master来进行源码处理,目前release版本是1.20,最常用的是1.18

启动分析

kubelet使用的是cobra第三方包来做启动包,同时做了命令行参数的处理,首先是NewKubeletCommand

// NewKubeletCommand creates a *cobra.Command object with default parameters func NewKubeletCommand() *cobra.Command { cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) kubeletFlags := options.NewKubeletFlags() kubeletConfig, err := options.NewKubeletConfiguration() // programmer error if err != nil { klog.Fatal(err) } ... }

可见kubelet的启动参数可以是命令行参数kubeletFlags,也可以是配置文件kubeletConfig,启动就是使用的cobra中的run方法

Run: func(cmd *cobra.Command, args []string) { // initial flag parse, since we disable cobra's flag parsing if err := cleanFlagSet.Parse(args); err != nil { cmd.Usage() klog.Fatal(err) } // check if there are non-flag arguments in the command line cmds := cleanFlagSet.Args() if len(cmds) > 0 { cmd.Usage() klog.Fatalf("unknown command: %s", cmds[0]) } // short-circuit on help help, err := cleanFlagSet.GetBool("help") if err != nil { klog.Fatal(`"help" flag is non-bool, programmer error, please correct`) } if help { cmd.Help() return } // short-circuit on verflag verflag.PrintAndExitIfRequested() cliflag.PrintFlags(cleanFlagSet) // set feature gates from initial flags-based config if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } // validate the initial KubeletFlags if err := options.ValidateKubeletFlags(kubeletFlags); err != nil { klog.Fatal(err) } if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") { klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead") } // load kubelet config file, if provided if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 { kubeletConfig, err = loadConfigFile(configFile) if err != nil { klog.Fatal(err) } // We must enforce flag precedence by re-parsing the command line into the new object. // This is necessary to preserve backwards-compatibility across binary upgrades. // See issue #56171 for more details. if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil { klog.Fatal(err) } // update feature gates based on new config if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } } // We always validate the local configuration (command line + config file). // This is the default "last-known-good" config for dynamic config, and must always remain valid. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil { klog.Fatal(err) } if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (0 != strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup)) { klog.Warning("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup") } // use dynamic kubelet config, if enabled var kubeletConfigController *dynamickubeletconfig.Controller if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 { var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir, func(kc *kubeletconfiginternal.KubeletConfiguration) error { // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence, // so that we get a complete validation at the same point where we can decide to reject dynamic config. // This fixes the flag-precedence component of issue #63305. // See issue #56171 for general details on flag precedence. return kubeletConfigFlagPrecedence(kc, args) }) if err != nil { klog.Fatal(err) } // If we should just use our existing, local config, the controller will return a nil config if dynamicKubeletConfig != nil { kubeletConfig = dynamicKubeletConfig // Note: flag precedence was already enforced in the controller, prior to validation, // by our above transform function. Now we simply update feature gates from the new config. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } } } // construct a KubeletServer from kubeletFlags and kubeletConfig kubeletServer := &options.KubeletServer{ KubeletFlags: *kubeletFlags, KubeletConfiguration: *kubeletConfig, } // use kubeletServer to construct the default KubeletDeps kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate) if err != nil { klog.Fatal(err) } // add the kubelet config controller to kubeletDeps kubeletDeps.KubeletConfigController = kubeletConfigController // set up signal context here in order to be reused by kubelet and docker shim ctx := genericapiserver.SetupSignalContext() // run the kubelet klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration) if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil { klog.Fatal(err) } }

这个方法虽然比较长,但是除了最终的Run方法,其余的步骤还是为kubelet的启动构建初始化的参数,无非就是换一个名称,换一个不同的结构体,并配置相依赖的参数,主要包括以下步骤: - 解析参数,对参数的合法性进行判断; - 根据kubeletConfig解析一些特殊的特性所需要配置的参数; - 配置kubeletServer,包括KubeletFlags和KubeletConfiguration两个参数; - 构造kubeletDeps结构体; - 启动最终的Run方法。

继续调用run方法

func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error { logOption := logs.NewOptions() logOption.LogFormat = s.Logging.Format logOption.LogSanitization = s.Logging.Sanitization logOption.Apply() // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil { return fmt.Errorf("failed OS init: %v", err) } if err := run(ctx, s, kubeDeps, featureGate); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) } return nil }

initForOS通过对操作系统的判断,如果是windows系统需要做一些预先的特殊处理;然后继续执行run方法

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) { // Set global feature gates based on the value on the initial KubeletServer err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates) if err != nil { return err } // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates) if err := options.ValidateKubeletServer(s); err != nil { return err } // Obtain Kubelet Lock File if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") } done := make(chan struct{}) if s.LockFilePath != "" { klog.Infof("acquiring file lock on %q", s.LockFilePath) if err := flock.Acquire(s.LockFilePath); err != nil { return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err) } if s.ExitOnLockContention { klog.Infof("watching for inotify events for: %v", s.LockFilePath) if err := watchForLockfileContention(s.LockFilePath, done); err != nil { return err } } } // Register current configuration with /configz endpoint err = initConfigz(&s.KubeletConfiguration) if err != nil { klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err) } if len(s.ShowHiddenMetricsForVersion) > 0 { metrics.SetShowHidden() } // About to get clients and such, detect standaloneMode standaloneMode := true if len(s.KubeConfig) > 0 { standaloneMode = false } if kubeDeps == nil { kubeDeps, err = UnsecuredDependencies(s, featureGate) if err != nil { return err } } if kubeDeps.Cloud == nil { if !cloudprovider.IsExternal(s.CloudProvider) { cloudprovider.DeprecationWarningForProvider(s.CloudProvider) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return err } if cloud != nil { klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) } kubeDeps.Cloud = cloud } } hostName, err := nodeutil.GetHostname(s.HostnameOverride) if err != nil { return err } nodeName, err := getNodeName(kubeDeps.Cloud, hostName) if err != nil { return err } // if in standalone mode, indicate as much by setting all clients to nil switch { case standaloneMode: kubeDeps.KubeClient = nil kubeDeps.EventClient = nil kubeDeps.HeartbeatClient = nil klog.Warningf("standalone mode, no API client") case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName) if err != nil { return err } if closeAllConns == nil { return errors.New("closeAllConns must be a valid function other than nil") } kubeDeps.OnHeartbeatFailure = closeAllConns kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet client: %v", err) } // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet event client: %v", err) } // make a separate client for heartbeat with throttling disabled and a timeout attached heartbeatClientConfig := *clientConfig heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration // The timeout is the minimum of the lease duration and status update frequency leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second if heartbeatClientConfig.Timeout > leaseTimeout { heartbeatClientConfig.Timeout = leaseTimeout } heartbeatClientConfig.QPS = float32(-1) kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err) } } if kubeDeps.Auth == nil { auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration) if err != nil { return err } kubeDeps.Auth = auth runAuthenticatorCAReload(ctx.Done()) } var cgroupRoots []string nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver) cgroupRoots = append(cgroupRoots, nodeAllocatableRoot) kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups) if err != nil { klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err) } else if kubeletCgroup != "" { cgroupRoots = append(cgroupRoots, kubeletCgroup) } runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups) if err != nil { klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err) } else if runtimeCgroup != "" { // RuntimeCgroups is optional, so ignore if it isn't specified cgroupRoots = append(cgroupRoots, runtimeCgroup) } if s.SystemCgroups != "" { // SystemCgroups is optional, so ignore if it isn't specified cgroupRoots = append(cgroupRoots, s.SystemCgroups) } if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) if err != nil { return err } } // Setup event recorder if required. makeEventRecorder(kubeDeps, nodeName) if kubeDeps.ContainerManager == nil { if s.CgroupsPerQOS && s.CgroupRoot == "" { klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /") s.CgroupRoot = "/" } var reservedSystemCPUs cpuset.CPUSet if s.ReservedSystemCPUs != "" { // is it safe do use CAdvisor here ?? machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo() if err != nil { // if can't use CAdvisor here, fall back to non-explicit cpu list behavor klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty") reservedSystemCPUs = cpuset.NewCPUSet() } else { var errParse error reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs) if errParse != nil { // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs) return errParse } reservedList := reservedSystemCPUs.ToSlice() first := reservedList[0] last := reservedList[len(reservedList)-1] if first < 0 || last >= machineInfo.NumCores { // the specified cpuset is outside of the range of what the machine has klog.Infof("Invalid cpuset specified by --reserved-cpus") return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs) } } } else { reservedSystemCPUs = cpuset.NewCPUSet() } if reservedSystemCPUs.Size() > 0 { // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved) if s.KubeReserved != nil { delete(s.KubeReserved, "cpu") } if s.SystemReserved == nil { s.SystemReserved = make(map[string]string) } s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size()) klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved) } kubeReserved, err := parseResourceList(s.KubeReserved) if err != nil { return err } systemReserved, err := parseResourceList(s.SystemReserved) if err != nil { return err } var hardEvictionThresholds []evictionapi.Threshold // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold { hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil) if err != nil { return err } } experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved) if err != nil { return err } devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{ RuntimeCgroupsName: s.RuntimeCgroups, SystemCgroupsName: s.SystemCgroups, KubeletCgroupsName: s.KubeletCgroups, ContainerRuntime: s.ContainerRuntime, CgroupsPerQOS: s.CgroupsPerQOS, CgroupRoot: s.CgroupRoot, CgroupDriver: s.CgroupDriver, KubeletRootDir: s.RootDirectory, ProtectKernelDefaults: s.ProtectKernelDefaults, NodeAllocatableConfig: cm.NodeAllocatableConfig{ KubeReservedCgroupName: s.KubeReservedCgroup, SystemReservedCgroupName: s.SystemReservedCgroup, EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...), KubeReserved: kubeReserved, SystemReserved: systemReserved, ReservedSystemCPUs: reservedSystemCPUs, HardEvictionThresholds: hardEvictionThresholds, }, QOSReserved: *experimentalQOSReserved, ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, ExperimentalPodPidsLimit: s.PodPidsLimit, EnforceCPULimits: s.CPUCFSQuota, CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration, ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy, ExperimentalTopologyManagerScope: s.TopologyManagerScope, }, s.FailSwapOn, devicePluginEnabled, kubeDeps.Recorder) if err != nil { return err } } if err := checkPermissions(); err != nil { klog.Error(err) } utilruntime.ReallyCrash = s.ReallyCrashForTesting // TODO(vmarmol): Do this through container config. oomAdjuster := kubeDeps.OOMAdjuster if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { klog.Warning(err) } err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, &s.ContainerRuntimeOptions, s.ContainerRuntime, s.RuntimeCgroups, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint, s.NonMasqueradeCIDR) if err != nil { return err } if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { return err } // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce { if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil { return err } } if s.HealthzPort > 0 { mux := http.NewServeMux() healthz.InstallHandler(mux) go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux) if err != nil { klog.Errorf("Starting healthz server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } if s.RunOnce { return nil } // If systemd is used, notify it that we have started go daemon.SdNotify(false, "READY=1") select { case 1 { return fmt.Errorf("dual-stack --node-ip %q not supported in a single-stack cluster", kubeServer.NodeIP) } else if len(nodeIPs) > 2 || (len(nodeIPs) == 2 && utilnet.IsIPv6(nodeIPs[0]) == utilnet.IsIPv6(nodeIPs[1])) { return fmt.Errorf("bad --node-ip %q; must contain either a single IP or a dual-stack pair of IPs", kubeServer.NodeIP) } else if len(nodeIPs) == 2 && kubeServer.CloudProvider != "" { return fmt.Errorf("dual-stack --node-ip %q not supported when using a cloud provider", kubeServer.NodeIP) } else if len(nodeIPs) == 2 && (nodeIPs[0].IsUnspecified() || nodeIPs[1].IsUnspecified()) { return fmt.Errorf("dual-stack --node-ip %q cannot include '0.0.0.0' or '::'", kubeServer.NodeIP) } capabilities.Initialize(capabilities.Capabilities{ AllowPrivileged: true, }) credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory) klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory) if kubeDeps.OSInterface == nil { kubeDeps.OSInterface = kubecontainer.RealOS{} } k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration, kubeDeps, &kubeServer.ContainerRuntimeOptions, kubeServer.ContainerRuntime, hostname, hostnameOverridden, nodeName, nodeIPs, kubeServer.ProviderID, kubeServer.CloudProvider, kubeServer.CertDirectory, kubeServer.RootDirectory, kubeServer.ImageCredentialProviderConfigFile, kubeServer.ImageCredentialProviderBinDir, kubeServer.RegisterNode, kubeServer.RegisterWithTaints, kubeServer.AllowedUnsafeSysctls, kubeServer.ExperimentalMounterPath, kubeServer.KernelMemcgNotification, kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount, kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold, kubeServer.MinimumGCAge, kubeServer.MaxPerPodContainerCount, kubeServer.MaxContainerCount, kubeServer.MasterServiceNamespace, kubeServer.RegisterSchedulable, kubeServer.KeepTerminatedPodVolumes, kubeServer.NodeLabels, kubeServer.SeccompProfileRoot, kubeServer.NodeStatusMaxImages) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } // NewMainKubelet should have set up a pod source config if one didn't exist // when the builder was run. This is just a precaution. if kubeDeps.PodConfig == nil { return fmt.Errorf("failed to create kubelet, pod source config was nil") } podCfg := kubeDeps.PodConfig if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil { klog.Errorf("Failed to set rlimit on max file handles: %v", err) } // process pods and exit. if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } klog.Info("Started kubelet as runonce") } else { startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer) klog.Info("Started kubelet") } return nil }

RunKubelet方法最重要的方法有两个:CreateAndInitKubelet和startKubelet,可以理解为CreateAndInitKubelet为参数的配置,startKubelet为最终的启动,其实最后说来说去还是把参数封装一遍,重新构造新的结构体来运行。

CreateAndInitKubelet方法通过调用NewMainKubelet返回Kubelet结构体。在NewMainKubelet中,主要的配置有:

PodConfig。通过makePodSourceConfig可以发现kubelet获取Pod的来源有以下途径:静态Pod、静态Pod的URL地址以及kube-apiserver; 容器与镜像的GC参数。 驱逐Pod策略。

最终通过参数填充Kubelet结构体,完成kubelet结构体参数的最终配置。

然后就是启动startKubelet,在启动之前,判断是以后台daemon进程一直运行还是只启动一次,即runOnce,基本上都是以后台daemon启动的方式,所以大部分调用的是startKubelet方法。

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) { // start the kubelet go k.Run(podCfg.Updates()) // start the kubelet server if enableServer { go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler) } if kubeCfg.ReadOnlyPort > 0 { go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints) } if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) { go k.ListenAndServePodResources() } }

可见开来goroutine,调用run方法,上面构建的是kubelet结构体,所以这边调用的也是kubelet的run方法。

// Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3