在上文中,已经通过例子和大量图解解释k8s编排系统中存储资源的调度,数据流等细节。 但是上次的文章主要都是从原理上进行解释,包括in tree volume plugin,external provisioner以及最新的CSI。 他们原理上是相通的,这也是本次要深入挖掘的代码环节。
问题分析
在之前的版本,kubelet负责处理什么时候该对卷附着在node上,以及什么时候把它移除。
想想如果仅仅是这样的话,那是有问题的,比如说,当kubelet某种原因崩溃了,或者node挂了,那么这个时候在该节点🌝
上的volume就没有安全的移除,一直卡着,尽管已经发生了故障,但是卷的状态却仍然为attached
,这就不科学了🤣
访问模式
ReadWriteOnce(RWO)
表示卷可以被一个Node节点以读写模式挂载,比如iSCSI,RBD,ReadOnlyMany(ROX)
表示卷可以被多个Node节点以只读模式挂载;比如NFS,ReadWriteMany(RWX)
表示卷可以被多个Node节点以读写模式挂载;比如NFS 总所周知,在kubernetes中pv的使用有几种访问模式,如上所示。
很多时候,node不可用或者说偶然间不可用🚫,那也是正常的,诸如断电,网络时延,重启等各自奇奇怪怪的原因也好,都有可能触发Pod的重新调度或者漂移,比如使用replica controller或者deployment部署的服务(直接使用Pod启动的容器不会漂移,因此Pod并不保证高可用,需要更加高级别的资源对象对其管理,比如ReplicaSet,DaemonsSet等等)。那如果这些Pod需要使用到一些PV卷,而这些PV卷又只能支持单个节点挂载,即访问模式为RWO(Read Write Once),说人话🌚就是这些存储卷只支持在单个Node节点上以读写模式挂载,比如iSCSI存储就是这样,访问模式相关知识点请翻阅 官方文章。那么此时由于Kubelet不可用,它就没办法把这块卷在原来的节点卸载,受访问模式所限制,就会出现即使Pod被重新调度到新的节点上了,但是volume没办法重新挂载过来的情况。这种设计是把Volume的管理完全独立,交由kubelt上的某个线程来处理。如果一个Pod不断的创建和删除,就会出现其他奇奇怪怪的现象。对volume的操作应该是构成竞争关系,毕竟不能同时挂载和卸载,同个时间内只能有一个状态。
此外还有其他很多的原因,比如权限问题,和云供应商☁️结合等等,这些可翻下github上的issue,标记了/sig storage标签的老问题了。可以找到挺多吐槽的点子。
走进代码之前
出于以上考虑,现在k8s版本已经将volume 的attach和detach的决定权从kubelet中移除了。 因为对于volume的操作不应该绑定在kubelet上,而是放在一个单独的 Attachment/Detachment 控制器上,而这个controller,和其他controller一样,隶属controller manager这个控制大脑🧠。 这样的话,即使kubelet挂逼了,volume也还能有抢救的余地😂,解决了kubelet故障pod和volume没办法重新调度的情况
Attachment/Detachment 控制器是怎么被开启的
这个控制器简称为AD controller,代码路径:kubernetes/pkg/controller/volume/attachdetach
可以看到其中还有populator和reconciler等关键的包,后续会讲到。
点进去可以看到有个构造方法:
//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
csiDriverInformer storageinformers.CSIDriverInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
disableReconciliationSync bool,
reconcilerSyncDuration time.Duration,
...省略...
}
顺藤摸瓜找到源头 根据引用关系可以确定,这是隶属在kube-controller-manager中央大脑的一个控制器单元。
稍微插个嘴,在kubernetes项目中,很多组件的main入口就在kubernetes/cmd/xxx下,由此可知,这里就是controller的入口。 为什么说controller是控制大脑呢?我们继续寻找到源头,可以看到kube-controller是这么初始化的:
//kubernetes/cmd/kube-controller-manager/app/core.go
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
return controllers
}
好家伙,现在能确定的是kube-controller-manager会在组件在启动后就启动一系列的控制单元
包括我们这里关注的attachdetach controller
。
AD controller能干什么
开始这个问题之前,先观察下AD controller的结构:
//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
kubeClient clientset.Interface //用于和API server通信
// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable.
pvcLister corelisters.PersistentVolumeClaimLister //监听PVC事件
pvcsSynced kcache.InformerSynced
// pvLister is the shared PV lister used to fetch and store PV objects
// from the API server. It is shared with other controllers and therefore
// the PV objects in its store should be treated as immutable.
pvLister corelisters.PersistentVolumeLister //监听PV事件并提供查询和缓存
pvsSynced kcache.InformerSynced
podLister corelisters.PodLister //监听Pod事件并提供查询和缓存
podsSynced kcache.InformerSynced
podIndexer kcache.Indexer
nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced
// csiDriverLister is the shared CSIDriver lister used to fetch and store
// CSIDriver objects from the API server. It is shared with other controllers
// and therefore the CSIDriver objects in its store should be treated as immutable.
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced kcache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
// volumePluginMgr used to initialize and fetch volume plugins
volumePluginMgr volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to this controller: i.e. what nodes the controller
// is managing, what volumes it wants be attached to these nodes, and which
// pods are scheduled to those nodes referencing the volumes.
// The data structure is populated by the controller using a stream of node
// and pod API server objects fetched by the informers.
desiredStateOfWorld cache.DesiredStateOfWorld //期望的状态
// actualStateOfWorld is a data structure containing the actual state of
// the world according to this controller: i.e. which volumes are attached
// to which nodes.
// The data structure is populated upon successful completion of attach and
// detach actions triggered by the controller and a periodic sync with
// storage providers for the "true" state of the world.
actualStateOfWorld cache.ActualStateOfWorld //实际状态
// attacherDetacher is used to start asynchronous attach and operations
attacherDetacher operationexecutor.OperationExecutor //执行attach和detach操作
// reconciler is used to run an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach
// detach operations using the attacherDetacher.
reconciler reconciler.Reconciler //状态调整器,不断循环检查状态♻️,然后协调
// nodeStatusUpdater is used to update node status with the list of attached
// volumes
nodeStatusUpdater statusupdater.NodeStatusUpdater
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// recorder is used to record events in the API server
recorder record.EventRecorder
// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
}
以上,有的注释已经非常清晰,这里我还特意加入了一些中文说明
找到重点
//kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer adc.pvcQueue.ShutDown()
klog.Infof("Starting attach detach controller")
defer klog.Infof("Shutting down attach detach controller")
synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if adc.csiDriversSynced != nil {
synced = append(synced, adc.csiDriversSynced)
}
if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return
}
err := adc.populateActualStateOfWorld()
if err != nil {
klog.Errorf("Error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
klog.Errorf("Error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)
metrics.Register(adc.pvcLister,
adc.pvLister,
adc.podLister,
adc.actualStateOfWorld,
adc.desiredStateOfWorld,
&adc.volumePluginMgr)
<-stopCh
}
从AD controller的启动入口可以看到几个重要的信息:
设置了在程序退出时候的函数调用链,优雅退出
defer runtime.HandleCrash() defer adc.pvcQueue.ShutDown()
controller.WaitForCacheSync
,通过Informer开始一轮的同步,包括Pod,Node,PVCerr := adc.populateActualStateOfWorld()
遍历Node,生成此时controller检测到的实际状态,后面会细解err = adc.populateDesiredStateOfWorld()
遍历Pod,根据Pod以及spec得到目标状态,比如是否要attach volume,后面会细讲启动
adc.reconciler.Run(stopCh)
一个协程,后台周期性协调实际检测的状态和期望的状态(通过发起attach和detach),后面再讲adc.desiredStateOfWorldPopulator.Run(stopCh)
类似4,但是这里是后台循环检测♻️,而且要复杂点go wait.Until(adc.pvcWorker, time.Second, stopCh)
同步PVC,根据PVC和Pod的关系决定是否需要处理
可以看到,启动函数就启动了几个后台进程,分别处理和监听特定的事件:
- populateActualStateOfWorld
- populateDesiredStateOfWorld
- adc.reconciler.Run(stopCh)
- 一系列的informer InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
AD controller究竟干了什么
可能你已经从上面看到了一些既熟悉又陌生的词,诸如desiredStateOfWorld,actualStateOfWorld,Reconciler,xxLister等等。
不要慌!下面来解释下
这里要解释清楚:
- desiredStateOfWorld: 期望的世界状态,简称DSW,假设集群内新调度了一个Pod,此时要用到volume,Pod被分配到某节点NodeA上。 此时,对于AD controller来说,他期望的世界应该是节点NodeA应该有被分配的volume在准备被这个Pod挂载。
它的结构大概是这样的(部分):
//DSW(desiredStateOfWorld) ---期望世界状态
//Nodes--Volumes--Pod
{
"nodesManaged🏀":[
//some Node
{
"nodeName":"aNode",
"volumesToAttach🛢":[
//some volume should attach this node
{
"volName":"someVol",
"scheduledPods":[], //somepods
"spec":{}
},
{
"volName":"someVol2",
"scheduledPods":[], //somepods
"spec":{}
},
]
},
{
"nodeName":"bNode",
"volumesToAttach":[
]
}
],
"others":{},
"volumePluginMgr":{}
}
- actualStateOfWorld: 实际世界状态,简称ASW,即此时volume实际管理状态。不管什么时候,调度完成也好正在调度也好,AD controller 进行同步得到了集群真实的状态。实际状态未必是和期望状态一样,比如实际状态Node上有刚调度过来的Pod,但是还没有相应已经attached状态的volume
它的结构大概是这样的(部分):
//ASW(actualStateOfWorld) --实际世界状态
{
"attachVolumes🛢":[
//volumeObj1
{
"volumeName":"volName",
"nodeAttachedTo🏀":[
{
"nodeName":"aNode",
"MountByNode":true,
},
{
"nodeName":"bNode",
"MountByNode":true,
}
]
},
//volumeObj2
{ ...},
],
"others":{}
}
AD controller会维护一份内存缓存,里面包含了一些它掌管的volume,比如需要attach和detach的卷,除此之外, 这些volume的缓存信息中还应该卷名以及他们所被调度的node信息,同样的,node信息又包含了调度在它上面的pod信息,并引用其中的volume。
在初始化阶段,AD controller 会查询API server获取到已经在运行状态的Volume和他们所附着的Node,这个为初始化cache。
AD controller会分别开启多个协程,定期循环做下面的事
获取状态描述
- 通过API Server 获取所有Pod的状态
加锁🔒
- 对于一些互操作的资源,类似上面所提到的in-memory cache 期望世界状态DSW和实际世界状态ASW
调解状态,不难理解,实际状态并不总和期望世界状一致,因此通过不停调解,尽量地切换到期望世界的状态,步骤如下
- 🔍搜索最近终止状态或者删除的pod
- 循环遍历缓存的pod对象,(如 volume->node->pods,配合前面所说的ASW对象来看) ,然后选择性将这些终止的pod从内存缓存中移除
- 情况A:缓存的Pod已经不在搜索到pod列表,或者这个Pod所调度的Node信息和缓存内的不一致(说明pod对象已经从API Server中删除或者重新调度到其他节点去了)
- 情况B:缓存的Pod仍然在搜索到的Pod列表,并且他的
PodPhase
状态是Succeeded
或者Failed
并且VolumeStatus表明volume可以安全卸载的
- 循环遍历缓存的pod对象,(如 volume->node->pods,配合前面所说的ASW对象来看) ,然后选择性将这些终止的pod从内存缓存中移除
- 🔍搜索新附着的volume
- 通过获取到的pod列表遍历其中的pod对象,对于每个状态为pending的pod,检查pod定义的或者引用的volume,(可以通过Pod-PVC-PV的关系找到这些volume对象)。如果volume已经在内存缓存中被追踪记录📝,并且这个pod被调往的node也已经内存缓存中并关联了目标volume(说明此时这个volume已经成功附着attach在这个Node上).
- 那就添加这个pod到内存缓存中,这个缓存会记录pod所属的node,这样就表明了新创建出来的pod和它引用的volume已经附着到这个pod所调度的node上了。
- 🔍搜索最近终止状态或者删除的pod
执行动作
- 触发 detach
- 循环检测缓存中的node信息(结合ASW比如通过volume->node), 针对已经附着attach到这个node上的volume(已经在内存缓存中),并且在这个node上没有相关pod,即表明了此时在这个节点上没有node在使用这些volume。如果volume实现
Detacher
接口,会触发detach逻辑。- Detach会在attach操作之前触发,这样的话,如果被引用的volume和pod被调度到其他的节点上也可以安全的再次进行attach。 (当然如果上一步的detach不成功导致后续调度的挂载hang住的情况那就另算,我们曾经遇到这个坑)
- 循环检测缓存中的node信息(结合ASW比如通过volume->node), 针对已经附着attach到这个node上的volume(已经在内存缓存中),并且在这个node上没有相关pod,即表明了此时在这个节点上没有node在使用这些volume。如果volume实现
- 触发attach
- 通过获取当前pod列表进行遍历,如果pod的
PodPhase
状态为Pending
, 那就检测pod定义或者引用的所有volume(可以通过Pod-PVC-PV的关系找到volume对象),如果volume实现Attacher
接口:- 情况A:volume还不在内存缓存记录中📝,说明这个volume是发现的,还没进行监控范围内,那就可以触发 attach逻辑,讲它附着到pod调度的node节点上。
- 情况B:volume已经在内存缓存有记录📝,但是pod被调往的node以及目标volume还不在内存缓存上。表明这volume还没附着到这node上
- 通过获取当前pod列表进行遍历,如果pod的
- 触发 detach
持久化状态
- 它将维护的内存态缓存信息(指PersistentVolume资源)持久化到API Server
- in-memory cache的首级有一个布尔值开关用于标记是否状态持久化,如果这个标志位没有设置,那么这个持久化动作将被忽略,避免这次无意义的操作API server。
- 对于内存缓存信息中的PersistentVolume对象,并且是已经附着在node上的PV对象(也可能附着在多个node上),则通过API server将它附着的nodes名单关联到这个PV对象
- 完成持久化动作后重置首级的布尔值开关位,表示当前操作已经ok。
- 它将维护的内存态缓存信息(指PersistentVolume资源)持久化到API Server
释放锁🔓 前面也提到了in-memory cache 是互相贡献的资源,会涉及到同时操作的场景。因此操作的时候需要加锁等待,操作完成即可解锁
一般来说Attach和Detach操作需要比较长的时候才能完成,因此主控制循环不应该在这两个操作上阻塞其他的操作。换而言之,attach和detach操作应该分别用两个独立的线程去完成。为了防止多个线程同时操作(attach/detach)一个volume,主线程应当维护一张表,用来对应volume当前进行的哪些操作。操控的线程数就应该是有限的,比如通过线程池的管理,一旦有线程还在处理那volume,后续负责同样volume操作的线程应该排队等待⌛️
具体的Attach(附着)逻辑
- 首先是先获取到volume操作锁🔒,这是个排他锁,一个volume同时只能进行一个操作,所以如果已经在操作中(attach或者dettach)则获取锁失败,这样避免多个线程同时操作同一资源。
- 所以如果指定的volume上加锁失败,操作就终止,(主控制进程稍后会重试)。
- 检查策略,决定是否能把volume attach到指定的节点上,如果不允许,那就什么也不操作。那么所谓的策略是什么呢?在CSI规范中,是有拓扑策略的,比如某类volume只能调度到指定的区域Nodes上(貌似AWS有这样的卷),或者某些node上有卷数量限制,这些就是所谓的策略。
- 此外,卷的访问模式
ReadWriteOnce
(即单个节点可读写),也是一种策略,它会阻止volume被调度到多个node上的操作。
- 此外,卷的访问模式
- 分配操作线程(Goroutine)
- 主要操作是执行将指定的volume依附到目标node上
- 如果遇到错误说volume已经attach到指定的node上,它会假设此时attac完成,忽略此错误,操作具有幂等性
- 除此之外的其他错误❌,记录错误信息后终止相关操作,直到下次重试。
- 当attach操作完成之时✅:
- 对内存缓存对象也进行加锁操作(需要更新资源了)
- 添加volume被attach到的node到内存缓存中(如通过volume->node),表明记录了volume已经attach到指定node上。
- 设置内存缓存首级上的是否持久化flag,表明当前PV对象状态可以持久化了。
- 完成后释放内存缓存对象上的锁
- 调API server设置PodStatus下的VolumeStatus对象的现在状态为
safeToMount
,就是告诉其他组件,现在可以安全mount这个PV了。
- 释放volume锁。
具体的Detach逻辑
非常类似attach的流程,只是反向操作
- 首先是先获取到volume操作锁🔒,这是个排他锁,一个volume同时只能进行一个操作,所以如果已经在操作中(attach或者dettach)则获取锁失败,这样避免多个线程同时操作同一资源。
- 所以如果指定的volume上加锁失败,操作就终止,(主控制进程稍后会重试)。
- 分配操作线程(Goroutine)
- 主要操作是执行将指定volume拆离指定node的操作
- 同样地,如果遇到错误说volume已经从指定的node上detach,那么它就可以假设此时detach完成,保持幂等性。
- 除此之外,其他的错误一律记录下来,并退出当前操作,主控循环会稍后重试的,所以不用慌。
- 一旦detach操作完成:
- 对内存缓存对象也进行加锁操作(又需要更新资源了)
- 将内存缓存对象中,把node对象从已经attached volume的node列表里移除,以表明volume从指定node移除成功。
- 设置内存缓存首级上的是否持久化flag,表明当前对象状态需要持久化
- 完成后释放内存缓存对象上的锁
- 释放volume操作锁
那么kubelet是不是就废掉attach和detach操作了呢🌝
啊不,其实目前来说为了兼容旧的模式,它还是保留这些功能, 只是在kubelet启动的时候添加flag来决定要不要在kubelt端开启这功能 有以下两种情况: A:Kubelet 开启volume attach/detach开关 ,传统的模式 B: Kubelet 关闭volume attach/detach开关,kubelet只负责挂载mount操作,
结合理论看代码
上面的已经整理好的AD controller 设计细节,带着这些理论基础,继续开启代码之旅。 还是回到Run函数,
初始化实际世界状态populateActualStateOfWorld()
func (adc *attachDetachController) populateActualStateOfWorld() error {
klog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
// In such a case it should be detached in the first reconciliation cycle and the
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
// scans the pods and updates their volumes in the ActualStateOfWorld too.
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
klog.Errorf("Failed to mark the volume as attached: %v", err)
continue
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.addNodeToDswp(node, types.NodeName(node.Name))
}
}
return nil
}
看到这里用了一个nodelister,nodelister是从infomerFactory工厂出来的一个缓存对象,infomer是Kubernetes源码中十分常见的对象,它主要是通过缓存了APIServer的对象结果,并维护了一套本地的缓存存储,通过监听API Server的事件以及定时重新同步的机制来更新, 客户端由此查询kubernetes对象信息的同时还能大大减少了直接操作API server的请求,是提高系统稳定性和性能的一大法宝。
这里通过nodeLister查询到当前系统的所有Node节点信息(根据Node信息确定当前实际世界正在怎么运行的,都说到这份上了,所以请回顾ASW对象是怎样的再继续)
接下来他会根据遍历编排系统上的每一个Node,查询它的node.Status.VolumesAttached
这个值,如果检查到有多个值,表明当前状态为有多个附着的volume,需要记录此信息到ASW缓存对象中,操作方式是调用actualStateOfWorld.MarkVolumeAsAttached()
。试着按直接查看定义会发现这个是接口,所以这里要查看他的具体实现代码。
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error {
_, err := asw.AddVolumeNode(uniqueName, volumeSpec, nodeName, devicePath, true)
return err
}
关键的还是asw的添加volume操作
func (asw *actualStateOfWorld) AddVolumeNode(
uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string, isAttached bool) (v1.UniqueVolumeName, error) {
asw.Lock()
defer asw.Unlock()
volumeName := uniqueName
if volumeName == "" {
if volumeSpec == nil {
return volumeName, fmt.Errorf("volumeSpec cannot be nil if volumeName is empty")
}
attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
return "", fmt.Errorf(
"failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err = util.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
}
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
volumeObj = attachedVolume{
volumeName: volumeName,
spec: volumeSpec,
nodesAttachedTo: make(map[types.NodeName]nodeAttachedTo),
devicePath: devicePath,
}
} else {
// If volume object already exists, it indicates that the information would be out of date.
// Update the fields for volume object except the nodes attached to the volumes.
volumeObj.devicePath = devicePath
volumeObj.spec = volumeSpec
klog.V(2).Infof("Volume %q is already added to attachedVolume list to node %q, update device path %q",
volumeName,
nodeName,
devicePath)
}
node, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
// Create object if it doesn't exist.
node = nodeAttachedTo{
nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
attachedConfirmed: isAttached,
detachRequestedTime: time.Time{},
}
} else {
node.attachedConfirmed = isAttached
klog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q, the current attach state is %t",
volumeName,
nodeName,
isAttached)
}
volumeObj.nodesAttachedTo[nodeName] = node
asw.attachedVolumes[volumeName] = volumeObj
if isAttached {
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}
return volumeName, nil
}
结合前面的asw缓存对象的定义来看,这里的操作就十分清晰, 如果asw缓存对象以及存在这个volume则更新,否则则添加volume到asw缓存。
进入下一步的处理环节adc.processVolumesInUse()
// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
klog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
break
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
if err != nil {
klog.Warningf(
"SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",
attachedVolume.VolumeName, nodeName, mounted, err)
}
}
}
主要还是从刚才的asw缓存对象里查询目标volume,
但这里是要设置mounted = true
,以此表明volume正在被使用中。
最后是adc.addNodeToDswp() 细心的你可能发现了,说好生成ActualStateOfWorld,怎么这有变成了DSW(desireStateVorld)了呢。 答案从代码可以找到
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
keepTerminatedPodVolumes := false
if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
keepTerminatedPodVolumes = (t == "true")
}
// Node specifies annotation indicating it should be managed by attach
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
}
}
因为这里会涉及一个是否需要处理keepTerminatedPodVolumes
,如果从annotation指示了要AD controller来处理,
这是一个指示位,所以把设置信息同步到dsw,这样负责dsw的GoRoutine就知道怎么维护这一块的状态了
关于populateActualStateOfWorld,这里做了一个十分详细的流程图,结合这个看到代码想想就简单清晰很多
初始化期望世界状态populateDesiredStateOfWorld
类似地,另一个缓存对象DSW的设置的代码分析如下,adc.populateDesiredStateOfWorld():
func (adc *attachDetachController) populateDesiredStateOfWorld() error {
klog.V(5).Infof("Populating DesiredStateOfworld")
pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
podToAdd := pod
adc.podAdd(podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
if err != nil {
klog.Errorf(
"Error creating spec for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
nodeName := types.NodeName(podToAdd.Spec.NodeName)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
klog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
klog.Errorf(
"Failed to find unique name for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) {
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
klog.Errorf("Failed to find device path: %v", err)
continue
}
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
}
}
}
}
return nil
}
非常类似,这里也用到了lister,但这次要列举的对象是Pod而不是Node,请思考为什么🤔 这里主要分成两个块处理逻辑, 一个是adc.podAdd(podToAdd) 另一个则是根据Pod.Spec.volumes的检测是否有效的volume,如果是则查询更多关于此volume的信息 并最终把状态回写到asw中,注意是asw缓存。 也就是说生成DesiredStateOfWorld缓存操作其实是放在了podAdd()上。 其实这里就是检测了volume是否有效,是否需要处理, 将决策结果用一个actionFlag表示true则表示添加到dsw,false则表示从dsw缓存删除, 决策部分如下:
// DetermineVolumeAction returns true if volume and pod needs to be added to dswp
// and it returns false if volume and pod needs to be removed from dswp
func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOfWorld, defaultAction bool) bool {
if pod == nil || len(pod.Spec.Volumes) <= 0 {
return defaultAction
}
nodeName := types.NodeName(pod.Spec.NodeName)
keepTerminatedPodVolume := desiredStateOfWorld.GetKeepTerminatedPodVolumesForNode(nodeName)
if util.IsPodTerminated(pod, pod.Status) {
// if pod is terminate we let kubelet policy dictate if volume
// should be detached or not
return keepTerminatedPodVolume
}
return defaultAction
}
即判断pod和volume是否还处于有效状态,以及刚才说到的keepTerminatedPodVolume
flag,
接下来则根据此actionFlag表示true则表示添加到dsw,false则表示从dsw缓存删除,具体的代码有点啰嗦这里就不复制了。
简单总结populateDesiredStateOfWorld()的行为如下图: 结合下图看源码估计就有个比较清晰的概念了。
协调器reconciler
在Run函数中,前面的这些都还只是初始化状态,接下来,主程序会启动单独协程,
分别是adc.reconciler.Run(stopCh),
adc.desiredStateOfWorldPopulator.Run(stopCh)
以及定期运行的go wait.Until(adc.pvcWorker, time.Second, stopCh)
其中的reconciler
就是一个协调器,
直接从adc.reconciler.Run(stopCh),进来可以非常清晰看到如下所示。
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)
}
// reconciliationLoopFunc this can be disabled via cli option disableReconciliation.
// It periodically checks whether the attached volumes from actual state
// are still attached to the node and update the status if they are not.
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
if rc.disableReconciliationSync {
klog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.")
} else if rc.syncDuration < time.Second {
klog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.")
} else if time.Since(rc.timeOfLastSync) > rc.syncDuration {
klog.V(5).Info("Starting reconciling attached volumes still attached")
rc.sync()
}
}
}
reconcile会在loopPeriod周而复始运行。 核心操作是rc.reconcile() 和定期的rc.sync()
先来看下rc.reconcile()
是个什么操作,
我在写这篇文章的时候,这里的逻辑有点乱,但可以概括为两部分:
1)检测ASW,检查哪些volume是可以安全detach的,//🤮这里的代码需要包装下,不然太乱了,等我写完这篇就提个pr,届时可能这里就有点变动
2)检测DSW,检查哪些volume是还没在ASW中的,如果不在那就触发attach
rc.reconcile()
时序图如下(部分有精简)
定期的rc.sync()逻辑相对比较简单:
func (rc *reconciler) sync() {
defer rc.updateSyncTime()
rc.syncStates()
}
func (rc *reconciler) syncStates() {
volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
}
也就是从ASW获取已经attach的volume, volumesPerNode是一个map结构,key为nodename,value为 []operationexecutor.AttachedVolume ,然后attacherDetacher验证该volume是否仍是attach状态,因篇所限制,暂时不展开,留着后续分析。
desiredStateOfWorldPopulator.Run()
DSW是个关键的结构,前面已经用伪流程图的形式解释过desiredStateOfWorldPopulator的初始化,那么
在完成初始化desiredStateOfWorld
结构后它的核心工作还有什么呢?
本节将关注在go adc.desiredStateOfWorldPopulator.Run(stopCh)
按照惯例,我整理了下图: dswp.populatorLoopFunc()
desiredStateOfWorldPopulator是一个go协程,会周期性地对检查volume 整理起来会分两大情况:
findAndRemoveDeletedPods
desiredStateOfWorldPopulator分别查询DSW缓存对象中地pod并对比podLister中地pod对象,并进行对比, 如果dsw中地pod已经不在podLister中,说明dsw缓存地pod信息已经落后,需根据策略将pod信息从当前dsw缓存中移除。 podLister是一个缓存对象,它缓存地数据是APIServer中地结果。因此可以看作是查询了APIServer的结果。findAndAddActivePods
有了1)的概念,这个就好理解了,同样,desiredStateOfWorldPopulator会 直接查询podLister(同上,这里可以理解为查询了APIServer)获取的最新pod信息。 然后查询pv,最后更新缓存数据到ASW(actualStateOfWorld)
至此,本文已经分析了ADController多个模块的工作原理⚙️。 但是看起来,前面似乎都会从volume和node的层面来工作的,当然还结合了ASW和DSW两个关键角色🎭
pvc监听处理pvcWoker
最后,其实还有另外一个角色,pvcWoker👷。 pvcWoker 顾名思义就是负责pvc的工人。 首先直接拨开顶层代码
// pvcWorker processes items from pvcQueue
func (adc *attachDetachController) pvcWorker() {
for adc.processNextItem() {
}
}
func (adc *attachDetachController) processNextItem() bool {
keyObj, shutdown := adc.pvcQueue.Get()
if shutdown {
return false
}
defer adc.pvcQueue.Done(keyObj)
if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
adc.pvcQueue.AddRateLimited(keyObj)
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
return true
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
adc.pvcQueue.Forget(keyObj)
return true
}
暂且不深入点开细节,可以看到这里涉及的操作大多从pvcQueue层面处理,那么问题来了,pvcQueue是个啥?
从定义来看,它是个专注处理pvc的队列操作对象,它拥有RateLimitingInterface
接口定义的能力,
// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
其中DelayingInterface
又是另一个接口,
继续揪出DelayingInterface
和最原始的接口定义Interface
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
这就是pvcQueue
常常用到的操作,Add,Len,Get这些操作都是针对接口的某个成员来操作。
这个操作对象是个切片,(t是个interface),当然这里pvcQueue作为实现这个接口的结构体,自然就代表了是一个关于pvc的queue了。
这个队列可以有add,get,设置shutDown的操作。
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
以上是代码定义上理解的pvcQueue,可以知道这个queue结构的基本功能。 它是个简单的队列结构:
呃,那能不能再直白点呢? queue 是个切片,每次get一个对象其实就是queue[0] processing 是一个set结构,实际上这里对应的是一个map结构,通常凑够queue结构取下来的数据会放在processing中 dirty 类似processing,需要继续处理的对象 failures,是暂时处理不了,遇到错误情况是寄存的map结构,定义在RateLimiter接口。之所以放把它从queue中拎出来是做错误重试机制。 比如如果queue的数据处理出错了,k8s可以实现指数退避算法隔一段实际后再从failures对象插入到queue等待⌛️下一次处理。
由于这个Queue结构在其他地方也会用到,pvcQueue,pvQueue,vaQueue.. 那就直接上图:
在NewAttachDetachController操作的时候,其实已经把pvc的add和update事件注册,有pvc变动的时候就是触发pvc Queue.Add()操作往queue里添加内容
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { adc.enqueuePVC(obj) }, UpdateFunc: func(old, new interface{}) { adc.enqueuePVC(new) }, })
通过pvcQueue.get()将队列内容弹出处理,并标记在processing这个map中
当调用pvcQueue.done()的时候,processing中的这个记录就移除,此时整个queue结构不再包含这记录
如果处理出错的pvc项会被放在fail这个集合。此时这个pvc对象暂时就不在queue的处理目标中
对应这些fail的项目,会在适当时候(通常为指数退避)重新通过pvcQueue.AddRateLimit()形式在隔离一段时候后重新天添加到queuezhong 开始下一轮的循环♻️
对已经fail的项目,还可以通过pvcQueue.forget()来将它移除。
之所以花了一些功夫说这里的设计是因为这个比较通用,在其他地方也能看到。
下面就是一个adcontroller中看到的一个例子 pvcWorker的工作流程processNextItem如图所示:
结合前面说的pvcQueue的基础再看这个流程就很清晰了。
a. 首先第一步,adc向pvcQueue发起get请求。 b. 除非这个queue是关闭状态,关闭状态会终止操作。否则就就弹出一个pvc对象进行处理 c. 对这个pvc进行同步 d. 如果同步失败,则安排重试 e. 优雅地结束。
FAQ: 为什么DSW是遍历pod? 因为Pod是被先调度的(顺序是先调度,再等待资源准备,完成初始化启动pod),根据被调度情况来判断volume该怎么处理,所以是从调度出去的pod期望需要的volume资源
ASW遍历Node: 要求实际情况,所以要拿Node上的运行状态。
本文总结
这篇文章其实写了好一段时候,断断续续,特别是流程的整理其实非常耗时间,因此在衔接上有点不是很自然。 但是,这可能是目前最详细的Controller分析。其实在kubernetes中,有很多东西都是一通百通。 因此这里通过ADcontroller的源码分析来一步一步走进更多坑中。 如果非要对adcontroller总结的,大概也能说几句,就是通过全文的分析可知道,
- 在这里大量使用的goroutine来完成很多复杂的后台工作,这对应一个编排系统而言是最基本的工作了。
- 非直接的交互避免强耦合,比如这里其实有跟APIServer的也不会直接操作apisever,而是通过lister这种方式来实现, 既能大大降低系统资源消耗还降低耦合依赖。
- 通过缓存中间态的形式降低组件的依赖,模块化整个k8s系统,避免某个模块影响引发的故障(比如这里一直在说的attach问题), 从而确保系统的可用性更高
- 欢迎补充👏
后面有时间会继续分享其他方面的内容