- kubernetes版本:v1.18.3
kubernetes存储相关源码
在kubernetes源码中与存储相关的代码有:
- PersistentVolumeBinderController
- AttachDetachController
- VolumeExpandController
- PVCProtectionController
- PVProtectionController
- CSI
- kubelet的VolumeManager,这几部分代码之间到底有什么关系呢,之间又是怎么联合工作的呢?
1. kube-controller-manager中的controller
kube-controller-manager可以代理启动一系列的controller组,基本上每一种k8s内置资源对象都有一个对应的controller实现, 所以k8s内置资源对应的controller入口启动函数在kube-controller-manager代码这边.
1.1 PersistentVolumeBinderController
启动入口函数
func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
//收集所有pv插件,实际上是实现了VolumePlugin接口的volume插件列表,其中hostPath和nfs类型的volume回收可以在额外配置
plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
if err != nil {
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
}
//初始化persistentvolumecontroller的参数
params := persistentvolumecontroller.ControllerParameters{
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
VolumePlugins: plugins,
Cloud: ctx.Cloud,
ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
PodInformer: ctx.InformerFactory.Core().V1().Pods(),
NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
}
//初始化PersistentVolumeController
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
}
//启动volumeController
go volumeController.Run(ctx.Stop)
return nil, true, nil
}
初始化PersistentVolumeController对象
// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
eventRecorder := p.EventRecorder
if eventRecorder == nil {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
}
controller := &PersistentVolumeController{
//为什么需要本地维护一份volumes、claims缓存,volumes、claims缓存保存的是上一个已知版本,
//如果没有这个缓存,从informers中获取到的volume.Status、claim就会是旧版本
volumes: newPersistentVolumeOrderedIndex(),
claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient: p.KubeClient,
eventRecorder: eventRecorder,
runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
cloud: p.Cloud,
enableDynamicProvisioning: p.EnableDynamicProvisioning,
clusterName: p.ClusterName,
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval: createProvisionedPVInterval,
claimQueue: workqueue.NewNamed("claims"),
volumeQueue: workqueue.NewNamed("volumes"),
resyncPeriod: p.SyncPeriod,
operationTimestamps: metrics.NewOperationStartTimeCache(),
}
//初始化所有volume插件
// Prober is nil because PV is not aware of Flexvolume.
if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
}
//设置VolumeInformer事件回调函数
p.VolumeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
},
)
controller.volumeLister = p.VolumeInformer.Lister()
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
//设置ClaimInformer事件回调函数
p.ClaimInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
},
)
controller.claimLister = p.ClaimInformer.Lister()
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
controller.podLister = p.PodInformer.Lister()
controller.podListerSynced = p.PodInformer.Informer().HasSynced
controller.NodeLister = p.NodeInformer.Lister()
controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
csiTranslator := csitrans.New()
controller.translator = csiTranslator
controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
return controller, nil
}
启动volumeController
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
//设置defer
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")
//informer从etcd同步一份数据当作缓存
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}
//更新PersistentVolumes/PersistentVolumeClaims informer中的缓存
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
//进行resync操作,让所有的pv/pvc key重新再进入队列
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
//从PersistentVolumes informer cache中获取循环更新ctrl.volumes.store
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
//从PersistentVolumeClaims informer cache中获取循环更新ctrl.claims
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
//往prometheus注册metric
metrics.Register(ctrl.volumes.store, ctrl.claims)
<-stopCh
}
volumeWorker逻辑
// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.volumeQueue.Get()
if quit {
return true
}
//标记key对象已处理
defer ctrl.volumeQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("volumeWorker[%s]", key)
//对key进行分离,分离namespace, name字段
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false
}
//从informer cache中获取
volume, err := ctrl.volumeLister.Get(name)
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
//存在就更新
ctrl.updateVolume(volume)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false
}
// The volume is not in informer cache, the event must have been
// "delete"
volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the volume from its cache
klog.V(2).Infof("deletion of volume %q was already processed", key)
return false
}
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
klog.Errorf("expected volume, got %+v", volumeObj)
return false
}
//如果不存在于informer cache中,就从ctrl.volumes.store删除该volume
ctrl.deleteVolume(volume)
return false
}
for {
if quit := workFunc(); quit {
klog.Infof("volume worker queue shutting down")
return
}
}
}
主处理逻辑
volumeWorker -> updateVolume -> syncVolume
// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
// Set correct "migrated-to" annotations on PV and update in API server if
// necessary
//是否添加pv.kubernetes.io/migrated-to annotation,从inTree过渡到csi
newVolume, err := ctrl.updateVolumeMigrationAnnotations(volume)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
volume = newVolume
// [Unit test set 4]
//pv没和pvc绑定
if volume.Spec.ClaimRef == nil {
// Volume is unused
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
//更新pv的状态为Available
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else /* pv.Spec.ClaimRef != nil */ {
// Volume is bound to a claim.
//pv和pvc绑定,但是pvc没和pv绑定
if volume.Spec.ClaimRef.UID == "" {
// The PV is reserved for a PVC; that PVC has not yet been
// bound to this PV; the PVC sync will handle it.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
//更新pv的状态为Available
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
}
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Get the PVC by _name_
var claim *v1.PersistentVolumeClaim
claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
//从维护的claims缓存中查找pv所绑定的pvc
obj, found, err := ctrl.claims.GetByKey(claimName)
if err != nil {
return err
}
//从claims缓存中没找到的话,如果存在pv.kubernetes.io/bound-by-controller annotation
if !found && metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
// If PV is bound by external PV binder (e.g. kube-scheduler), it's
// possible on heavy load that corresponding PVC is not synced to
// controller local cache yet. So we need to double-check PVC in
// 1) informer cache
// 2) apiserver if not found in informer cache
// to make sure we will not reclaim a PV wrongly.
// Note that only non-released and non-failed volumes will be
// updated to Released state when PVC does not exist.
//先从informer cache中查找pvc,如果没有的话,再从apiserver查找pvc
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
found = !apierrors.IsNotFound(err)
if !found {
obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
found = !apierrors.IsNotFound(err)
}
}
}
if !found {
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Fall through with claim = nil
} else {
var ok bool
claim, ok = obj.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
}
//异常情况处理:pv指向的pvc被删除了,重制pvc,即claim=nil
if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
// The claim that the PV was pointing to was deleted, and another
// with the same name created.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Treat the volume as bound to a missing claim.
claim = nil
}
if claim == nil {
// If we get into this block, the claim must have been deleted;
// NOTE: reclaimVolume may either release the PV back into the pool or
// recycle it or do nothing (retain)
// Do not overwrite previous Failed state - let the user see that
// something went wrong, while we still re-try to reclaim the
// volume.
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
//重置volume的状态为Released
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}
//根据volume回收策略进行回收操作
//这里会调用到一个goroutinemap的包,通过名字来管理goroutines
if err = ctrl.reclaimVolume(volume); err != nil {
// Release failed, we will fall back into the same condition
// in the next call to this method
return err
}
return nil
} else if claim.Spec.VolumeName == "" {
//检查pvc和pv的volumeMode,是否是一致的fs或block
if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
// Binding for the volume won't be called in syncUnboundClaim,
// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
// Skipping syncClaim
return nil
}
//如果volume有pv.kubernetes.io/bound-by-controller的annotation
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
// The binding is not completed; let PVC sync handle it
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
} else {
// Dangling PV; try to re-establish the link in the PVC sync
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
}
// In both cases, the volume is Bound and the claim is Pending.
// Next syncClaim will fix it. To speed it up, we enqueue the claim
// into the controller, which results in syncClaim to be called
// shortly (and in the right worker goroutine).
// This speeds up binding of provisioned volumes - provisioner saves
// only the new PV and it expects that next syncClaim will bind the
// claim to it.
ctrl.claimQueue.Add(claimToClaimKey(claim))
return nil
} else if claim.Spec.VolumeName == volume.Name {
// Volume is bound to a claim properly, update status if necessary
klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
//更新volume的状态为Bound
if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else {
//排除上面那些情况,接下来就剩下pv已绑定pvc,pvc绑定到另一个pv的情况了
//如果volume的annotation中有pv.kubernetes.io/provisioned-by且回收策略是delete
// Volume is bound to a claim, but the claim is bound elsewhere
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
// This volume was dynamically provisioned for this claim. The
// claim got bound elsewhere, and thus this volume is not
// needed. Delete it.
// Mark the volume as Released for external deleters and to let
// the user know. Don't overwrite existing Failed status!
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
//更新volume的状态为Released
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}
//根据volume回收策略进行回收操作
if err = ctrl.reclaimVolume(volume); err != nil {
// Deletion failed, we will fall back into the same condition
// in the next call to this method
return err
}
return nil
} else {
// Volume is bound to a claim, but the claim is bound elsewhere
// and it's not dynamically provisioned.
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
// This is part of the normal operation of the controller; the
// controller tried to use this volume for a claim but the claim
// was fulfilled by another volume. We did this; fix it.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
//pv和pvc解绑,更新volume的状态为Available
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
} else {
// The PV must have been created with this ptr; leave it alone.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
// This just updates the volume phase and clears
// volume.Spec.ClaimRef.UID. It leaves the volume pre-bound
// to the claim.
//pv和pvc解绑,更新volume的状态为Available
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
}
}
}
}
}
claimWorker逻辑
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
return true
}
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
//从informer cache中获取
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the claim from its cache
klog.V(2).Infof("deletion of claim %q was already processed", key)
return false
}
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.deleteClaim(claim)
return false
}
for {
if quit := workFunc(); quit {
klog.Infof("claim worker queue shutting down")
return
}
}
}
主处理逻辑
claimWorker -> updateClaim -> syncClaim
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
// Set correct "migrated-to" annotations on PVC and update in API server if
// necessary
//更新claim的annotation,从inTree过渡到csi
newClaim, err := ctrl.updateClaimMigrationAnnotations(claim)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
claim = newClaim
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
//同步未被绑定的claim
return ctrl.syncUnboundClaim(claim)
} else {
//同步已被绑定的claim
return ctrl.syncBoundClaim(claim)
}
}
syncUnboundClaim函数
// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
// This is a new PVC that has not completed binding
// OBSERVATION: pvc is "Pending"
//pvc未绑定至pv
if claim.Spec.VolumeName == "" {
// User did not care which PV they get.
//如果pvc指向的volume为空,校验pvc的storageclass是否为WaitForFirstConsumer绑定方式
//storageclass pvc和pv的绑定方式有:1. 立即绑定 2. 被pod使用到
delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
if err != nil {
return err
}
// [Unit test set 1]
//从pvIndex中寻找跟pvc最匹配的pv,pvIndex根据accessmodes建立的索引
//1. 获取所有匹配的accessModes
//2. 根据[]v1.PersistentVolumeAccessMode过滤出所有的volume
//3. 寻找最一个最匹配的volume(容量最接近的)
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
}
if volume == nil {
//未找到匹配的pv
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
// No PV could be found
// OBSERVATION: pvc is "Pending", will retry
switch {
//pvc是否是延迟绑定
case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")
case v1helper.GetPersistentVolumeClaimClass(claim) != "":
//没有找到匹配的pv,继续执行provision volume的过程
//1.使用external provisioner provision volume
//2.使用内置的provisioner provision volume
if err = ctrl.provisionClaim(claim); err != nil {
return err
}
return nil
default:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
}
// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
//更新claim的状态为Pending
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else /* pv != nil */ {
//找到匹配的pv
// Found a PV for this claim
// OBSERVATION: pvc is "Pending", pv is "Available"
claimKey := claimToClaimKey(claim)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
//pv与pvc相互绑定
if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
// record count error for provision if exists
// timestamp entry will remain in cache until a success binding has happened
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
return err
}
// OBSERVATION: claim is "Bound", pv is "Bound"
// if exists a timestamp entry in cache, record end to end provision latency and clean up cache
// End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric"
// [Unit test 12-1, 12-2, 12-4]
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
return nil
}
} else /* pvc.Spec.VolumeName != nil */ {
// [Unit test set 2]
// User asked for a specific PV.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
// User asked for a PV that does not exist.
// OBSERVATION: pvc is "Pending"
// Retry later.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
//从维护的volumes找不到volume对象的话,更新claim的状态为Pending
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
//从维护的volumes找到volume对象的话
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
//pvc状态"Pending", pv状态"Available"
if volume.Spec.ClaimRef == nil {
// User asked for a PV that is not claimed
// OBSERVATION: pvc is "Pending", pv is "Available"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
//检测pv是否满足pvc要求
if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
// send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
// volume does not satisfy the requirements of the claim
//更新claim的状态为Pending
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
//pv和pvc相互绑定
} else if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else if pvutil.IsVolumeBoundToClaim(volume, claim) {
//pvc状态"Pending", pv状态"Bound"
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID.
//pv和pvc相互绑定
if err = ctrl.bind(volume, claim); err != nil {
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
//pvc状态"Bound", pv状态"Bound"
return nil
} else {
//pv指向其他的pvc,pvc状态"Pending", pv状态"Bound"
// User asked for a PV that is claimed by someone else
// OBSERVATION: pvc is "Pending", pv is "Bound"
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
// User asked for a specific PV, retry later
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
// This should never happen because someone had to remove
// AnnBindCompleted annotation on the claim.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
}
}
}
}
}
syncBoundClaim函数
// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
// HasAnnotation(pvc, pvutil.AnnBindCompleted)
// This PVC has previously been bound
// OBSERVATION: pvc is not "Pending"
// [Unit test set 3]
if claim.Spec.VolumeName == "" {
// Claim was bound before but not any more.
//pvc之前是Bound状态,现在是Lost状态
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
}
//从本地volumes缓存中根据pv名字获取对应的对象
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
// Claim is bound to a non-existing volume.
//未找到匹配的pv对象
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
} else {
//找到匹配的pv对象
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil {
//pvc绑定了pv,但pv未绑定pvc
// Claim is bound but volume has come unbound.
// Or, a claim was bound and the controller has not received updated
// volume yet. We can't distinguish these cases.
// Bind the volume again and set all states to Bound.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
//pv和pvc相互绑定
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else if volume.Spec.ClaimRef.UID == claim.UID {
//pvc和pv已相互绑定
// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
// everything should be already set.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else {
//pvc是Bound状态,但pv却和其他pvc绑定了
// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
return err
}
return nil
}
}
}
1.2 AttachDetachController
1.3 VolumeExpandController
1.4 PVCProtectionController
1.5 PVProtectionController
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付