在上文中,已经通过例子和大量图解解释k8s编排系统中存储资源的调度,数据流等细节。 但是上次的文章主要都是从原理上进行解释,包括in tree volume plugin,external provisioner以及最新的CSI。 他们原理上是相通的,这也是本次要深入挖掘的代码环节。

问题分析

在之前的版本,kubelet负责处理什么时候该对卷附着在node上,以及什么时候把它移除。 想想如果仅仅是这样的话,那是有问题的,比如说,当kubelet某种原因崩溃了,或者node挂了,那么这个时候在该节点🌝 上的volume就没有安全的移除,一直卡着,尽管已经发生了故障,但是卷的状态却仍然为attached,这就不科学了🤣

访问模式

  • ReadWriteOnce(RWO) 表示卷可以被一个Node节点以读写模式挂载,比如iSCSI,RBD,
  • ReadOnlyMany(ROX) 表示卷可以被多个Node节点以只读模式挂载;比如NFS,
  • ReadWriteMany(RWX) 表示卷可以被多个Node节点以读写模式挂载;比如NFS 总所周知,在kubernetes中pv的使用有几种访问模式,如上所示。

很多时候,node不可用或者说偶然间不可用🚫,那也是正常的,诸如断电,网络时延,重启等各自奇奇怪怪的原因也好,都有可能触发Pod的重新调度或者漂移,比如使用replica controller或者deployment部署的服务(直接使用Pod启动的容器不会漂移,因此Pod并不保证高可用,需要更加高级别的资源对象对其管理,比如ReplicaSet,DaemonsSet等等)。那如果这些Pod需要使用到一些PV卷,而这些PV卷又只能支持单个节点挂载,即访问模式为RWO(Read Write Once),说人话🌚就是这些存储卷只支持在单个Node节点上以读写模式挂载,比如iSCSI存储就是这样,访问模式相关知识点请翻阅 官方文章。那么此时由于Kubelet不可用,它就没办法把这块卷在原来的节点卸载,受访问模式所限制,就会出现即使Pod被重新调度到新的节点上了,但是volume没办法重新挂载过来的情况。这种设计是把Volume的管理完全独立,交由kubelt上的某个线程来处理。如果一个Pod不断的创建和删除,就会出现其他奇奇怪怪的现象。对volume的操作应该是构成竞争关系,毕竟不能同时挂载和卸载,同个时间内只能有一个状态。

此外还有其他很多的原因,比如权限问题,和云供应商☁️结合等等,这些可翻下github上的issue,标记了/sig storage标签的老问题了。可以找到挺多吐槽的点子。

走进代码之前

出于以上考虑,现在k8s版本已经将volume 的attach和detach的决定权从kubelet中移除了。 因为对于volume的操作不应该绑定在kubelet上,而是放在一个单独的 Attachment/Detachment 控制器上,而这个controller,和其他controller一样,隶属controller manager这个控制大脑🧠。 这样的话,即使kubelet挂逼了,volume也还能有抢救的余地😂,解决了kubelet故障pod和volume没办法重新调度的情况

Attachment/Detachment 控制器是怎么被开启的

这个控制器简称为AD controller,代码路径:kubernetes/pkg/controller/volume/attachdetach code

可以看到其中还有populator和reconciler等关键的包,后续会讲到。

点进去可以看到有个构造方法:

//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go

// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
   kubeClient clientset.Interface,
   podInformer coreinformers.PodInformer,
   nodeInformer coreinformers.NodeInformer,
   pvcInformer coreinformers.PersistentVolumeClaimInformer,
   pvInformer coreinformers.PersistentVolumeInformer,
   csiNodeInformer storageinformers.CSINodeInformer,
   csiDriverInformer storageinformers.CSIDriverInformer,
   cloud cloudprovider.Interface,
   plugins []volume.VolumePlugin,
   prober volume.DynamicPluginProber,
   disableReconciliationSync bool,
   reconcilerSyncDuration time.Duration,
...省略...
}

顺藤摸瓜找到源头 根据引用关系可以确定,这是隶属在kube-controller-manager中央大脑的一个控制器单元。 code-ref

稍微插个嘴,在kubernetes项目中,很多组件的main入口就在kubernetes/cmd/xxx下,由此可知,这里就是controller的入口。 为什么说controller是控制大脑呢?我们继续寻找到源头,可以看到kube-controller是这么初始化的:

//kubernetes/cmd/kube-controller-manager/app/core.go

// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
   controllers := map[string]InitFunc{}
   controllers["endpoint"] = startEndpointController
   controllers["replicationcontroller"] = startReplicationController
   controllers["podgc"] = startPodGCController
   controllers["resourcequota"] = startResourceQuotaController
   controllers["namespace"] = startNamespaceController
   controllers["serviceaccount"] = startServiceAccountController
   controllers["garbagecollector"] = startGarbageCollectorController
   controllers["daemonset"] = startDaemonSetController
   controllers["job"] = startJobController
   controllers["deployment"] = startDeploymentController
   controllers["replicaset"] = startReplicaSetController
   controllers["horizontalpodautoscaling"] = startHPAController
   controllers["disruption"] = startDisruptionController
   controllers["statefulset"] = startStatefulSetController
   controllers["cronjob"] = startCronJobController
   controllers["csrsigning"] = startCSRSigningController
   controllers["csrapproving"] = startCSRApprovingController
   controllers["csrcleaner"] = startCSRCleanerController
   controllers["ttl"] = startTTLController
   controllers["bootstrapsigner"] = startBootstrapSignerController
   controllers["tokencleaner"] = startTokenCleanerController
   controllers["nodeipam"] = startNodeIpamController
   controllers["nodelifecycle"] = startNodeLifecycleController
   if loopMode == IncludeCloudLoops {
      controllers["service"] = startServiceController
      controllers["route"] = startRouteController
      controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
      // TODO: volume controller into the IncludeCloudLoops only set.
   }
   controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
   controllers["attachdetach"] = startAttachDetachController
   controllers["persistentvolume-expander"] = startVolumeExpandController
   controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
   controllers["pvc-protection"] = startPVCProtectionController
   controllers["pv-protection"] = startPVProtectionController
   controllers["ttl-after-finished"] = startTTLAfterFinishedController
   controllers["root-ca-cert-publisher"] = startRootCACertPublisher

   return controllers
}

好家伙,现在能确定的是kube-controller-manager会在组件在启动后就启动一系列的控制单元 包括我们这里关注的attachdetach controller

AD controller能干什么

开始这个问题之前,先观察下AD controller的结构:

//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
type attachDetachController struct {
   // kubeClient is the kube API client used by volumehost to communicate with
   // the API server.
   kubeClient clientset.Interface //用于和API server通信

   // pvcLister is the shared PVC lister used to fetch and store PVC
   // objects from the API server. It is shared with other controllers and
   // therefore the PVC objects in its store should be treated as immutable.
   pvcLister  corelisters.PersistentVolumeClaimLister  //监听PVC事件
   pvcsSynced kcache.InformerSynced

   // pvLister is the shared PV lister used to fetch and store PV objects
   // from the API server. It is shared with other controllers and therefore
   // the PV objects in its store should be treated as immutable.
   pvLister  corelisters.PersistentVolumeLister //监听PV事件并提供查询和缓存
   pvsSynced kcache.InformerSynced

   podLister  corelisters.PodLister //监听Pod事件并提供查询和缓存
   podsSynced kcache.InformerSynced
   podIndexer kcache.Indexer

   nodeLister  corelisters.NodeLister
   nodesSynced kcache.InformerSynced

   csiNodeLister storagelisters.CSINodeLister
   csiNodeSynced kcache.InformerSynced

   // csiDriverLister is the shared CSIDriver lister used to fetch and store
   // CSIDriver objects from the API server. It is shared with other controllers
   // and therefore the CSIDriver objects in its store should be treated as immutable.
   csiDriverLister  storagelisters.CSIDriverLister
   csiDriversSynced kcache.InformerSynced

   // cloud provider used by volume host
   cloud cloudprovider.Interface

   // volumePluginMgr used to initialize and fetch volume plugins
   volumePluginMgr volume.VolumePluginMgr

   // desiredStateOfWorld is a data structure containing the desired state of
   // the world according to this controller: i.e. what nodes the controller
   // is managing, what volumes it wants be attached to these nodes, and which
   // pods are scheduled to those nodes referencing the volumes.
   // The data structure is populated by the controller using a stream of node
   // and pod API server objects fetched by the informers.
   desiredStateOfWorld cache.DesiredStateOfWorld //期望的状态

   // actualStateOfWorld is a data structure containing the actual state of
   // the world according to this controller: i.e. which volumes are attached
   // to which nodes.
   // The data structure is populated upon successful completion of attach and
   // detach actions triggered by the controller and a periodic sync with
   // storage providers for the "true" state of the world.
   actualStateOfWorld cache.ActualStateOfWorld  //实际状态

   // attacherDetacher is used to start asynchronous attach and operations
   attacherDetacher operationexecutor.OperationExecutor  //执行attach和detach操作

   // reconciler is used to run an asynchronous periodic loop to reconcile the
   // desiredStateOfWorld with the actualStateOfWorld by triggering attach
   // detach operations using the attacherDetacher.
   reconciler reconciler.Reconciler  //状态调整器,不断循环检查状态♻️,然后协调

   // nodeStatusUpdater is used to update node status with the list of attached
   // volumes
   nodeStatusUpdater statusupdater.NodeStatusUpdater

   // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
   // populate the current pods using podInformer.
   desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

   // recorder is used to record events in the API server
   recorder record.EventRecorder

   // pvcQueue is used to queue pvc objects
   pvcQueue workqueue.RateLimitingInterface
}

以上,有的注释已经非常清晰,这里我还特意加入了一些中文说明

找到重点

//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
   defer runtime.HandleCrash()
   defer adc.pvcQueue.ShutDown()

   klog.Infof("Starting attach detach controller")
   defer klog.Infof("Shutting down attach detach controller")

   synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
   if adc.csiNodeSynced != nil {
      synced = append(synced, adc.csiNodeSynced)
   }
   if adc.csiDriversSynced != nil {
      synced = append(synced, adc.csiDriversSynced)
   }

   if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
      return
   }

   err := adc.populateActualStateOfWorld()
   if err != nil {
      klog.Errorf("Error populating the actual state of world: %v", err)
   }
   err = adc.populateDesiredStateOfWorld()
   if err != nil {
      klog.Errorf("Error populating the desired state of world: %v", err)
   }
   go adc.reconciler.Run(stopCh)
   go adc.desiredStateOfWorldPopulator.Run(stopCh)
   go wait.Until(adc.pvcWorker, time.Second, stopCh)
   metrics.Register(adc.pvcLister,
      adc.pvLister,
      adc.podLister,
      adc.actualStateOfWorld,
      adc.desiredStateOfWorld,
      &adc.volumePluginMgr)

   <-stopCh
}

从AD controller的启动入口可以看到几个重要的信息:

  1. 设置了在程序退出时候的函数调用链,优雅退出

    defer runtime.HandleCrash()
    defer adc.pvcQueue.ShutDown()
    
  2. controller.WaitForCacheSync,通过Informer开始一轮的同步,包括Pod,Node,PVC

  3. err := adc.populateActualStateOfWorld() 遍历Node,生成此时controller检测到的实际状态,后面会细解

  4. err = adc.populateDesiredStateOfWorld() 遍历Pod,根据Pod以及spec得到目标状态,比如是否要attach volume,后面会细讲

  5. 启动adc.reconciler.Run(stopCh)一个协程,后台周期性协调实际检测的状态和期望的状态(通过发起attach和detach),后面再讲

  6. adc.desiredStateOfWorldPopulator.Run(stopCh) 类似4,但是这里是后台循环检测♻️,而且要复杂点

  7. go wait.Until(adc.pvcWorker, time.Second, stopCh) 同步PVC,根据PVC和Pod的关系决定是否需要处理

可以看到,启动函数就启动了几个后台进程,分别处理和监听特定的事件:

  • populateActualStateOfWorld
  • populateDesiredStateOfWorld
  • adc.reconciler.Run(stopCh)
  • 一系列的informer InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}

AD controller究竟干了什么

可能你已经从上面看到了一些既熟悉又陌生的词,诸如desiredStateOfWorld,actualStateOfWorld,Reconciler,xxLister等等。

不要慌!下面来解释下

这里要解释清楚:

  • desiredStateOfWorld: 期望的世界状态,简称DSW,假设集群内新调度了一个Pod,此时要用到volume,Pod被分配到某节点NodeA上。 此时,对于AD controller来说,他期望的世界应该是节点NodeA应该有被分配的volume在准备被这个Pod挂载。

它的结构大概是这样的(部分):

//DSW(desiredStateOfWorld) ---期望世界状态
//Nodes--Volumes--Pod
{
    "nodesManaged🏀":[
        //some Node
        {
            "nodeName":"aNode",
            "volumesToAttach🛢":[
                //some volume should attach this node
                {
                    "volName":"someVol",
                    "scheduledPods":[], //somepods
                    "spec":{}
                },
                {
                    "volName":"someVol2",
                    "scheduledPods":[], //somepods
                    "spec":{}
                },
            ]
        },
        {
            "nodeName":"bNode",
            "volumesToAttach":[
            ]
        }
    ],
    "others":{},
    "volumePluginMgr":{}
}
  • actualStateOfWorld: 实际世界状态,简称ASW,即此时volume实际管理状态。不管什么时候,调度完成也好正在调度也好,AD controller 进行同步得到了集群真实的状态。实际状态未必是和期望状态一样,比如实际状态Node上有刚调度过来的Pod,但是还没有相应已经attached状态的volume

它的结构大概是这样的(部分):

//ASW(actualStateOfWorld) --实际世界状态
{
    "attachVolumes🛢":[
        //volumeObj1
        {
            "volumeName":"volName",
            "nodeAttachedTo🏀":[
                {
                    "nodeName":"aNode",
                    "MountByNode":true,
                },
                {
                    "nodeName":"bNode",
                    "MountByNode":true,
                }
            ]
        },
        //volumeObj2
        { ...},
    ],
    "others":{}
}

AD controller会维护一份内存缓存,里面包含了一些它掌管的volume,比如需要attach和detach的卷,除此之外, 这些volume的缓存信息中还应该卷名以及他们所被调度的node信息,同样的,node信息又包含了调度在它上面的pod信息,并引用其中的volume。

在初始化阶段,AD controller 会查询API server获取到已经在运行状态的Volume和他们所附着的Node,这个为初始化cache。

AD controller会分别开启多个协程,定期循环做下面的事

  • 获取状态描述

    • 通过API Server 获取所有Pod的状态
  • 加锁🔒

    • 对于一些互操作的资源,类似上面所提到的in-memory cache 期望世界状态DSW和实际世界状态ASW
  • 调解状态,不难理解,实际状态并不总和期望世界状一致,因此通过不停调解,尽量地切换到期望世界的状态,步骤如下

    • 🔍搜索最近终止状态或者删除的pod
      • 循环遍历缓存的pod对象,(如 volume->node->pods,配合前面所说的ASW对象来看) ,然后选择性将这些终止的pod从内存缓存中移除
        • 情况A:缓存的Pod已经不在搜索到pod列表,或者这个Pod所调度的Node信息和缓存内的不一致(说明pod对象已经从API Server中删除或者重新调度到其他节点去了)
        • 情况B:缓存的Pod仍然在搜索到的Pod列表,并且他的PodPhase状态是 Succeeded 或者 Failed 并且VolumeStatus表明volume可以安全卸载的
    • 🔍搜索新附着的volume
      • 通过获取到的pod列表遍历其中的pod对象,对于每个状态为pending的pod,检查pod定义的或者引用的volume,(可以通过Pod-PVC-PV的关系找到这些volume对象)。如果volume已经在内存缓存中被追踪记录📝,并且这个pod被调往的node也已经内存缓存中并关联了目标volume(说明此时这个volume已经成功附着attach在这个Node上).
      • 那就添加这个pod到内存缓存中,这个缓存会记录pod所属的node,这样就表明了新创建出来的pod和它引用的volume已经附着到这个pod所调度的node上了。
  • 执行动作

    • 触发 detach
      • 循环检测缓存中的node信息(结合ASW比如通过volume->node), 针对已经附着attach到这个node上的volume(已经在内存缓存中),并且在这个node上没有相关pod,即表明了此时在这个节点上没有node在使用这些volume。如果volume实现Detacher接口,会触发detach逻辑。
        • Detach会在attach操作之前触发,这样的话,如果被引用的volume和pod被调度到其他的节点上也可以安全的再次进行attach。 (当然如果上一步的detach不成功导致后续调度的挂载hang住的情况那就另算,我们曾经遇到这个坑)
    • 触发attach
      • 通过获取当前pod列表进行遍历,如果pod的PodPhase状态为Pending, 那就检测pod定义或者引用的所有volume(可以通过Pod-PVC-PV的关系找到volume对象),如果volume实现Attacher接口:
        • 情况A:volume还不在内存缓存记录中📝,说明这个volume是发现的,还没进行监控范围内,那就可以触发 attach逻辑,讲它附着到pod调度的node节点上。
        • 情况B:volume已经在内存缓存有记录📝,但是pod被调往的node以及目标volume还不在内存缓存上。表明这volume还没附着到这node上
  • 持久化状态

    • 它将维护的内存态缓存信息(指PersistentVolume资源)持久化到API Server
      • in-memory cache的首级有一个布尔值开关用于标记是否状态持久化,如果这个标志位没有设置,那么这个持久化动作将被忽略,避免这次无意义的操作API server。
      • 对于内存缓存信息中的PersistentVolume对象,并且是已经附着在node上的PV对象(也可能附着在多个node上),则通过API server将它附着的nodes名单关联到这个PV对象
      • 完成持久化动作后重置首级的布尔值开关位,表示当前操作已经ok。
  • 释放锁🔓 前面也提到了in-memory cache 是互相贡献的资源,会涉及到同时操作的场景。因此操作的时候需要加锁等待,操作完成即可解锁

一般来说Attach和Detach操作需要比较长的时候才能完成,因此主控制循环不应该在这两个操作上阻塞其他的操作。换而言之,attach和detach操作应该分别用两个独立的线程去完成。为了防止多个线程同时操作(attach/detach)一个volume,主线程应当维护一张表,用来对应volume当前进行的哪些操作。操控的线程数就应该是有限的,比如通过线程池的管理,一旦有线程还在处理那volume,后续负责同样volume操作的线程应该排队等待⌛️

具体的Attach(附着)逻辑

  • 首先是先获取到volume操作锁🔒,这是个排他锁,一个volume同时只能进行一个操作,所以如果已经在操作中(attach或者dettach)则获取锁失败,这样避免多个线程同时操作同一资源。
    • 所以如果指定的volume上加锁失败,操作就终止,(主控制进程稍后会重试)。
  • 检查策略,决定是否能把volume attach到指定的节点上,如果不允许,那就什么也不操作。那么所谓的策略是什么呢?在CSI规范中,是有拓扑策略的,比如某类volume只能调度到指定的区域Nodes上(貌似AWS有这样的卷),或者某些node上有卷数量限制,这些就是所谓的策略。
    • 此外,卷的访问模式 ReadWriteOnce(即单个节点可读写),也是一种策略,它会阻止volume被调度到多个node上的操作。
  • 分配操作线程(Goroutine)
    • 主要操作是执行将指定的volume依附到目标node上
    • 如果遇到错误说volume已经attach到指定的node上,它会假设此时attac完成,忽略此错误,操作具有幂等性
    • 除此之外的其他错误❌,记录错误信息后终止相关操作,直到下次重试。
    • 当attach操作完成之时✅:
      • 对内存缓存对象也进行加锁操作(需要更新资源了)
      • 添加volume被attach到的node到内存缓存中(如通过volume->node),表明记录了volume已经attach到指定node上。
      • 设置内存缓存首级上的是否持久化flag,表明当前PV对象状态可以持久化了。
      • 完成后释放内存缓存对象上的锁
      • 调API server设置PodStatus下的VolumeStatus对象的现在状态为safeToMount,就是告诉其他组件,现在可以安全mount这个PV了。
  • 释放volume锁。

具体的Detach逻辑

非常类似attach的流程,只是反向操作

  • 首先是先获取到volume操作锁🔒,这是个排他锁,一个volume同时只能进行一个操作,所以如果已经在操作中(attach或者dettach)则获取锁失败,这样避免多个线程同时操作同一资源。
    • 所以如果指定的volume上加锁失败,操作就终止,(主控制进程稍后会重试)。
  • 分配操作线程(Goroutine)
    • 主要操作是执行将指定volume拆离指定node的操作
    • 同样地,如果遇到错误说volume已经从指定的node上detach,那么它就可以假设此时detach完成,保持幂等性。
    • 除此之外,其他的错误一律记录下来,并退出当前操作,主控循环会稍后重试的,所以不用慌。
    • 一旦detach操作完成:
      • 对内存缓存对象也进行加锁操作(又需要更新资源了)
      • 将内存缓存对象中,把node对象从已经attached volume的node列表里移除,以表明volume从指定node移除成功。
      • 设置内存缓存首级上的是否持久化flag,表明当前对象状态需要持久化
      • 完成后释放内存缓存对象上的锁
  • 释放volume操作锁

那么kubelet是不是就废掉attach和detach操作了呢🌝

啊不,其实目前来说为了兼容旧的模式,它还是保留这些功能, 只是在kubelet启动的时候添加flag来决定要不要在kubelt端开启这功能 有以下两种情况: A:Kubelet 开启volume attach/detach开关 ,传统的模式 B: Kubelet 关闭volume attach/detach开关,kubelet只负责挂载mount操作,

结合理论看代码

上面的已经整理好的AD controller 设计细节,带着这些理论基础,继续开启代码之旅。 还是回到Run函数,

初始化实际世界状态populateActualStateOfWorld()

func (adc *attachDetachController) populateActualStateOfWorld() error {
   klog.V(5).Infof("Populating ActualStateOfworld")
   nodes, err := adc.nodeLister.List(labels.Everything())
   if err != nil {
      return err
   }

   for _, node := range nodes {
      nodeName := types.NodeName(node.Name)
      for _, attachedVolume := range node.Status.VolumesAttached {
         uniqueName := attachedVolume.Name
         // The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
         // In such a case it should be detached in the first reconciliation cycle and the
         // volume spec is not needed to detach a volume. If the volume is used by a pod, it
         // its spec can be: this would happen during in the populateDesiredStateOfWorld which
         // scans the pods and updates their volumes in the ActualStateOfWorld too.
         err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
         if err != nil {
            klog.Errorf("Failed to mark the volume as attached: %v", err)
            continue
         }
         adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
         adc.addNodeToDswp(node, types.NodeName(node.Name))
      }
   }
   return nil
}

看到这里用了一个nodelister,nodelister是从infomerFactory工厂出来的一个缓存对象,infomer是Kubernetes源码中十分常见的对象,它主要是通过缓存了APIServer的对象结果,并维护了一套本地的缓存存储,通过监听API Server的事件以及定时重新同步的机制来更新, 客户端由此查询kubernetes对象信息的同时还能大大减少了直接操作API server的请求,是提高系统稳定性和性能的一大法宝。

这里通过nodeLister查询到当前系统的所有Node节点信息(根据Node信息确定当前实际世界正在怎么运行的,都说到这份上了,所以请回顾ASW对象是怎样的再继续) 接下来他会根据遍历编排系统上的每一个Node,查询它的node.Status.VolumesAttached这个值,如果检查到有多个值,表明当前状态为有多个附着的volume,需要记录此信息到ASW缓存对象中,操作方式是调用actualStateOfWorld.MarkVolumeAsAttached()。试着按直接查看定义会发现这个是接口,所以这里要查看他的具体实现代码。

func (asw *actualStateOfWorld) MarkVolumeAsAttached(
   uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error {
   _, err := asw.AddVolumeNode(uniqueName, volumeSpec, nodeName, devicePath, true)
   return err
}

关键的还是asw的添加volume操作

func (asw *actualStateOfWorld) AddVolumeNode(
 uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string, isAttached bool) (v1.UniqueVolumeName, error) {
 asw.Lock()
 defer asw.Unlock()

 volumeName := uniqueName
 if volumeName == "" {
  if volumeSpec == nil {
   return volumeName, fmt.Errorf("volumeSpec cannot be nil if volumeName is empty")
  }
  attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  if err != nil || attachableVolumePlugin == nil {
   return "", fmt.Errorf(
    "failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
    volumeSpec.Name(),
    err)
  }

  volumeName, err = util.GetUniqueVolumeNameFromSpec(
   attachableVolumePlugin, volumeSpec)
  if err != nil {
   return "", fmt.Errorf(
    "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
    volumeSpec.Name(),
    err)
  }
 }

 volumeObj, volumeExists := asw.attachedVolumes[volumeName]
 if !volumeExists {
  volumeObj = attachedVolume{
   volumeName: volumeName,
   spec: volumeSpec,
   nodesAttachedTo: make(map[types.NodeName]nodeAttachedTo),
   devicePath: devicePath,
  }
 } else {
  // If volume object already exists, it indicates that the information would be out of date.
  // Update the fields for volume object except the nodes attached to the volumes.
  volumeObj.devicePath = devicePath
  volumeObj.spec = volumeSpec
  klog.V(2).Infof("Volume %q is already added to attachedVolume list to node %q, update device path %q",
   volumeName,
   nodeName,
   devicePath)
 }
 node, nodeExists := volumeObj.nodesAttachedTo[nodeName]
 if !nodeExists {
  // Create object if it doesn't exist.
  node = nodeAttachedTo{
   nodeName: nodeName,
   mountedByNode: true, // Assume mounted, until proven otherwise
   attachedConfirmed: isAttached,
   detachRequestedTime: time.Time{},
  }
 } else {
  node.attachedConfirmed = isAttached
  klog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q, the current attach state is %t",
   volumeName,
   nodeName,
   isAttached)
 }

 volumeObj.nodesAttachedTo[nodeName] = node
 asw.attachedVolumes[volumeName] = volumeObj

 if isAttached {
  asw.addVolumeToReportAsAttached(volumeName, nodeName)
 }
 return volumeName, nil
}

结合前面的asw缓存对象的定义来看,这里的操作就十分清晰, 如果asw缓存对象以及存在这个volume则更新,否则则添加volume到asw缓存。

进入下一步的处理环节adc.processVolumesInUse()

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
   nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
   klog.V(4).Infof("processVolumesInUse for node %q", nodeName)
   for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
      mounted := false
      for _, volumeInUse := range volumesInUse {
         if attachedVolume.VolumeName == volumeInUse {
            mounted = true
            break
         }
      }
      err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
      if err != nil {
         klog.Warningf(
            "SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",
            attachedVolume.VolumeName, nodeName, mounted, err)
      }
   }
}

主要还是从刚才的asw缓存对象里查询目标volume, 但这里是要设置mounted = true,以此表明volume正在被使用中。

最后是adc.addNodeToDswp() 细心的你可能发现了,说好生成ActualStateOfWorld,怎么这有变成了DSW(desireStateVorld)了呢。 答案从代码可以找到

func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
   if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
      keepTerminatedPodVolumes := false

      if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
         keepTerminatedPodVolumes = (t == "true")
      }

      // Node specifies annotation indicating it should be managed by attach
      // detach controller. Add it to desired state of world.
      adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
   }
}

因为这里会涉及一个是否需要处理keepTerminatedPodVolumes,如果从annotation指示了要AD controller来处理, 这是一个指示位,所以把设置信息同步到dsw,这样负责dsw的GoRoutine就知道怎么维护这一块的状态了

关于populateActualStateOfWorld,这里做了一个十分详细的流程图,结合这个看到代码想想就简单清晰很多 populateactualstateofworld

初始化期望世界状态populateDesiredStateOfWorld

类似地,另一个缓存对象DSW的设置的代码分析如下,adc.populateDesiredStateOfWorld():

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
 klog.V(5).Infof("Populating DesiredStateOfworld")

 pods, err := adc.podLister.List(labels.Everything())
 if err != nil {
  return err
 }
 for _, pod := range pods {
  podToAdd := pod
  adc.podAdd(podToAdd)
  for _, podVolume := range podToAdd.Spec.Volumes {
   // The volume specs present in the ActualStateOfWorld are nil, let's replace those
   // with the correct ones found on pods. The present in the ASW with no corresponding
   // pod will be detached and the spec is irrelevant.
   volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
   if err != nil {
    klog.Errorf(
     "Error creating spec for volume %q, pod %q/%q: %v",
     podVolume.Name,
     podToAdd.Namespace,
     podToAdd.Name,
     err)
    continue
   }
   nodeName := types.NodeName(podToAdd.Spec.NodeName)
   plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
   if err != nil || plugin == nil {
    klog.V(10).Infof(
     "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
     podVolume.Name,
     podToAdd.Namespace,
     podToAdd.Name,
     err)
    continue
   }
   volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
   if err != nil {
    klog.Errorf(
     "Failed to find unique name for volume %q, pod %q/%q: %v",
     podVolume.Name,
     podToAdd.Namespace,
     podToAdd.Name,
     err)
    continue
   }
   if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) {
    devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
    if err != nil {
     klog.Errorf("Failed to find device path: %v", err)
     continue
    }
    err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
    if err != nil {
     klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
    }
   }
  }
 }

 return nil
}

非常类似,这里也用到了lister,但这次要列举的对象是Pod而不是Node,请思考为什么🤔 这里主要分成两个块处理逻辑, 一个是adc.podAdd(podToAdd) 另一个则是根据Pod.Spec.volumes的检测是否有效的volume,如果是则查询更多关于此volume的信息 并最终把状态回写到asw中,注意是asw缓存。 也就是说生成DesiredStateOfWorld缓存操作其实是放在了podAdd()上。 其实这里就是检测了volume是否有效,是否需要处理, 将决策结果用一个actionFlag表示true则表示添加到dsw,false则表示从dsw缓存删除, 决策部分如下:

// DetermineVolumeAction returns true if volume and pod needs to be added to dswp
// and it returns false if volume and pod needs to be removed from dswp
func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOfWorld, defaultAction bool) bool {
 if pod == nil || len(pod.Spec.Volumes) <= 0 {
  return defaultAction
 }
 nodeName := types.NodeName(pod.Spec.NodeName)
 keepTerminatedPodVolume := desiredStateOfWorld.GetKeepTerminatedPodVolumesForNode(nodeName)

 if util.IsPodTerminated(pod, pod.Status) {
  // if pod is terminate we let kubelet policy dictate if volume
  // should be detached or not
  return keepTerminatedPodVolume
 }
 return defaultAction
}

即判断pod和volume是否还处于有效状态,以及刚才说到的keepTerminatedPodVolume flag,

接下来则根据此actionFlag表示true则表示添加到dsw,false则表示从dsw缓存删除,具体的代码有点啰嗦这里就不复制了。

简单总结populateDesiredStateOfWorld()的行为如下图: 结合下图看源码估计就有个比较清晰的概念了。 populatedesiredstateofworld

协调器reconciler

在Run函数中,前面的这些都还只是初始化状态,接下来,主程序会启动单独协程, 分别是adc.reconciler.Run(stopCh), adc.desiredStateOfWorldPopulator.Run(stopCh) 以及定期运行的go wait.Until(adc.pvcWorker, time.Second, stopCh)

其中的reconciler就是一个协调器, 直接从adc.reconciler.Run(stopCh),进来可以非常清晰看到如下所示。

func (rc *reconciler) Run(stopCh <-chan struct{}) {
   wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)
}

// reconciliationLoopFunc this can be disabled via cli option disableReconciliation.
// It periodically checks whether the attached volumes from actual state
// are still attached to the node and update the status if they are not.
func (rc *reconciler) reconciliationLoopFunc() func() {
   return func() {

      rc.reconcile()

      if rc.disableReconciliationSync {
         klog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.")
      } else if rc.syncDuration < time.Second {
         klog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.")
      } else if time.Since(rc.timeOfLastSync) > rc.syncDuration {
         klog.V(5).Info("Starting reconciling attached volumes still attached")
         rc.sync()
      }
   }
}

reconcile会在loopPeriod周而复始运行。 核心操作是rc.reconcile() 和定期的rc.sync()

先来看下rc.reconcile()是个什么操作, 我在写这篇文章的时候,这里的逻辑有点乱,但可以概括为两部分: 1)检测ASW,检查哪些volume是可以安全detach的,//🤮这里的代码需要包装下,不然太乱了,等我写完这篇就提个pr,届时可能这里就有点变动 2)检测DSW,检查哪些volume是还没在ASW中的,如果不在那就触发attach

rc.reconcile()时序图如下(部分有精简) reconciler

定期的rc.sync()逻辑相对比较简单:

func (rc *reconciler) sync() {
    defer rc.updateSyncTime()
    rc.syncStates()
}

func (rc *reconciler) syncStates() {
    volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
    rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
}

也就是从ASW获取已经attach的volume, volumesPerNode是一个map结构,key为nodename,value为 []operationexecutor.AttachedVolume ,然后attacherDetacher验证该volume是否仍是attach状态,因篇所限制,暂时不展开,留着后续分析。

desiredStateOfWorldPopulator.Run()

DSW是个关键的结构,前面已经用伪流程图的形式解释过desiredStateOfWorldPopulator的初始化,那么 在完成初始化desiredStateOfWorld结构后它的核心工作还有什么呢? 本节将关注在go adc.desiredStateOfWorldPopulator.Run(stopCh)

按照惯例,我整理了下图: dswp.populatorLoopFunc() 6populatorloopfunc

desiredStateOfWorldPopulator是一个go协程,会周期性地对检查volume 整理起来会分两大情况:

  • findAndRemoveDeletedPods desiredStateOfWorldPopulator分别查询DSW缓存对象中地pod并对比podLister中地pod对象,并进行对比, 如果dsw中地pod已经不在podLister中,说明dsw缓存地pod信息已经落后,需根据策略将pod信息从当前dsw缓存中移除。 podLister是一个缓存对象,它缓存地数据是APIServer中地结果。因此可以看作是查询了APIServer的结果。

  • findAndAddActivePods 有了1)的概念,这个就好理解了,同样,desiredStateOfWorldPopulator会 直接查询podLister(同上,这里可以理解为查询了APIServer)获取的最新pod信息。 然后查询pv,最后更新缓存数据到ASW(actualStateOfWorld)

至此,本文已经分析了ADController多个模块的工作原理⚙️。 但是看起来,前面似乎都会从volume和node的层面来工作的,当然还结合了ASW和DSW两个关键角色🎭

pvc监听处理pvcWoker

最后,其实还有另外一个角色,pvcWoker👷。 pvcWoker 顾名思义就是负责pvc的工人。 首先直接拨开顶层代码

// pvcWorker processes items from pvcQueue
func (adc *attachDetachController) pvcWorker() {
   for adc.processNextItem() {
   }
}

func (adc *attachDetachController) processNextItem() bool {
   keyObj, shutdown := adc.pvcQueue.Get()
   if shutdown {
      return false
   }
   defer adc.pvcQueue.Done(keyObj)

   if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
      // Rather than wait for a full resync, re-add the key to the
      // queue to be processed.
      adc.pvcQueue.AddRateLimited(keyObj)
      runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
      return true
   }

   // Finally, if no error occurs we Forget this item so it does not
   // get queued again until another change happens.
   adc.pvcQueue.Forget(keyObj)
   return true
}

暂且不深入点开细节,可以看到这里涉及的操作大多从pvcQueue层面处理,那么问题来了,pvcQueue是个啥? 从定义来看,它是个专注处理pvc的队列操作对象,它拥有RateLimitingInterface接口定义的能力,

// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
   DelayingInterface

   // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
   AddRateLimited(item interface{})

   // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
   // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
   // still have to call `Done` on the queue.
   Forget(item interface{})

   // NumRequeues returns back how many times the item was requeued
   NumRequeues(item interface{}) int
}

其中DelayingInterface又是另一个接口, 继续揪出DelayingInterface和最原始的接口定义Interface

// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
   Interface
   // AddAfter adds an item to the workqueue after the indicated duration has passed
   AddAfter(item interface{}, duration time.Duration)
}

type Interface interface {
   Add(item interface{})
   Len() int
   Get() (item interface{}, shutdown bool)
   Done(item interface{})
   ShutDown()
   ShuttingDown() bool
}

这就是pvcQueue常常用到的操作,Add,Len,Get这些操作都是针对接口的某个成员来操作。 这个操作对象是个切片,(t是个interface),当然这里pvcQueue作为实现这个接口的结构体,自然就代表了是一个关于pvc的queue了。 这个队列可以有add,get,设置shutDown的操作。

type Type struct {
   // queue defines the order in which we will work on items. Every
   // element of queue should be in the dirty set and not in the
   // processing set.
   queue []t

   // dirty defines all of the items that need to be processed.
   dirty set

   // Things that are currently being processed are in the processing set.
   // These things may be simultaneously in the dirty set. When we finish
   // processing something and remove it from this set, we'll check if
   // it's in the dirty set, and if so, add it to the queue.
   processing set

   cond *sync.Cond

   shuttingDown bool

   metrics queueMetrics

   unfinishedWorkUpdatePeriod time.Duration
   clock                      clock.Clock
}

以上是代码定义上理解的pvcQueue,可以知道这个queue结构的基本功能。 它是个简单的队列结构:

呃,那能不能再直白点呢? queue 是个切片,每次get一个对象其实就是queue[0] processing 是一个set结构,实际上这里对应的是一个map结构,通常凑够queue结构取下来的数据会放在processing中 dirty 类似processing,需要继续处理的对象 failures,是暂时处理不了,遇到错误情况是寄存的map结构,定义在RateLimiter接口。之所以放把它从queue中拎出来是做错误重试机制。 比如如果queue的数据处理出错了,k8s可以实现指数退避算法隔一段实际后再从failures对象插入到queue等待⌛️下一次处理。

由于这个Queue结构在其他地方也会用到,pvcQueue,pvQueue,vaQueue.. 那就直接上图: pvcQueue

  1. 在NewAttachDetachController操作的时候,其实已经把pvc的add和update事件注册,有pvc变动的时候就是触发pvc Queue.Add()操作往queue里添加内容

    pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        adc.enqueuePVC(obj)
    },
    UpdateFunc: func(old, new interface{}) {
        adc.enqueuePVC(new)
    },
    })
    
  2. 通过pvcQueue.get()将队列内容弹出处理,并标记在processing这个map中

  3. 当调用pvcQueue.done()的时候,processing中的这个记录就移除,此时整个queue结构不再包含这记录

  4. 如果处理出错的pvc项会被放在fail这个集合。此时这个pvc对象暂时就不在queue的处理目标中

  5. 对应这些fail的项目,会在适当时候(通常为指数退避)重新通过pvcQueue.AddRateLimit()形式在隔离一段时候后重新天添加到queuezhong 开始下一轮的循环♻️

  6. 对已经fail的项目,还可以通过pvcQueue.forget()来将它移除。

之所以花了一些功夫说这里的设计是因为这个比较通用,在其他地方也能看到。

下面就是一个adcontroller中看到的一个例子 pvcWorker的工作流程processNextItem如图所示: pvcWorker

结合前面说的pvcQueue的基础再看这个流程就很清晰了。

a. 首先第一步,adc向pvcQueue发起get请求。 b. 除非这个queue是关闭状态,关闭状态会终止操作。否则就就弹出一个pvc对象进行处理 c. 对这个pvc进行同步 d. 如果同步失败,则安排重试 e. 优雅地结束。

FAQ: 为什么DSW是遍历pod? 因为Pod是被先调度的(顺序是先调度,再等待资源准备,完成初始化启动pod),根据被调度情况来判断volume该怎么处理,所以是从调度出去的pod期望需要的volume资源

ASW遍历Node: 要求实际情况,所以要拿Node上的运行状态。

本文总结

这篇文章其实写了好一段时候,断断续续,特别是流程的整理其实非常耗时间,因此在衔接上有点不是很自然。 但是,这可能是目前最详细的Controller分析。其实在kubernetes中,有很多东西都是一通百通。 因此这里通过ADcontroller的源码分析来一步一步走进更多坑中。 如果非要对adcontroller总结的,大概也能说几句,就是通过全文的分析可知道,

  • 在这里大量使用的goroutine来完成很多复杂的后台工作,这对应一个编排系统而言是最基本的工作了。
  • 非直接的交互避免强耦合,比如这里其实有跟APIServer的也不会直接操作apisever,而是通过lister这种方式来实现, 既能大大降低系统资源消耗还降低耦合依赖。
  • 通过缓存中间态的形式降低组件的依赖,模块化整个k8s系统,避免某个模块影响引发的故障(比如这里一直在说的attach问题), 从而确保系统的可用性更高
  • 欢迎补充👏

后面有时间会继续分享其他方面的内容