Run
确立目标
理解 kube-controller-manager 的运行机制
从主函数找到run函数,代码较长,这里精简了一下
func Run(c *config.CompletedConfig, stopChStartControllersfunc StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { // 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数 for controllerName, initFn := range controllers { // 是否允许启动 if !ctx.IsControllerEnabled(controllerName) { klog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V(1).Infof("Starting %q", controllerName) // 调用init函数进行启动 debugHandler, started, err := initFn(ctx) if err != nil { klog.Errorf("Error starting %q", controllerName) return err } if !started { klog.Warningf("Skipping %q", controllerName) continue } // 注册对应controller到debug的url中 if debugHandler != nil && unsecuredMux != nil { basePath := "/debug/controllers/" + controllerName unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler)) unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler)) } klog.Infof("Started %q", controllerName) } return nil } // 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,不一一细讲 func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["endpointslice"] = startEndpointSliceController controllers["endpointslicemirroring"] = startEndpointSliceMirroringController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["csrcleaner"] = startCSRCleanerController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["nodeipam"] = startNodeIpamController controllers["nodelifecycle"] = startNodeLifecycleController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController controllers["route"] = startRouteController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController } controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher controllers["ephemeral-volume"] = startEphemeralVolumeController return controllers }ReplicaSet由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。
但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。简单来说,ReplicaSet 就是用来生成指定个数的Pod
代码在pkg/controller/replica_set.go
ReplicaSetControllerfunc startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] { return nil, false, nil } // 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer // 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源 go replicaset.NewReplicaSetController( ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) return nil, true, nil } // 运行函数 func (rsc *ReplicaSetController) Run(workers int, stopChsyncReplicaSetfunc (rsc *ReplicaSetController) syncReplicaSet(key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) }() // 从key中拆分出 namespace 和 name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } // 根据name,从 Lister 获取对应的 ReplicaSets 信息 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) // 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err)) return nil } // 根据namespace和labels获取所有的pod allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } // 过滤无效的pod filteredPods := controller.FilterActivePods(allPods) // 根据selector再过滤pod filteredPods, err = rsc.claimPods(rs, selector, filteredPods) if err != nil { return err } var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { // 管理 ReplicaSet,下面详细分析 manageReplicasErr = rsc.manageReplicas(filteredPods, rs) } rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // 更新状态 updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) if err != nil { return err } if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) } return manageReplicasErr } // 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的 func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { // diff = 当前pod数 - 期望pod数 diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } // diff小于0,表示需要扩容,即新增Pod if diff 0 { } return nil } 站在前人的肩膀上,向前辈致敬,Respect!Summarykube-controller-manager 的核心思想是: 根据期望状态和当前状态,管理Kubernetes中的资源。 以ReplicaSet为例,它对比了定义声明的Pod数和当前集群中满足条件的Pod数,进行相对应的扩缩容。

我的微信
微信号已复制
我的微信
这是我的微信扫一扫