0
点赞
收藏
分享

微信扫一扫

K8S kubelet之volume_host.go源码解读

/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
	"fmt"
	"net"
	"runtime"

	utilexec "k8s.io/utils/exec"

	authenticationv1 "k8s.io/api/authentication/v1"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	clientset "k8s.io/client-go/kubernetes"
	storagelisters "k8s.io/client-go/listers/storage/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	cloudprovider "k8s.io/cloud-provider"
	"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
	"k8s.io/kubernetes/pkg/kubelet/configmap"
	"k8s.io/kubernetes/pkg/kubelet/secret"
	"k8s.io/kubernetes/pkg/kubelet/token"
	"k8s.io/kubernetes/pkg/volume"
	"k8s.io/kubernetes/pkg/volume/util"
	"k8s.io/kubernetes/pkg/volume/util/hostutil"
	"k8s.io/kubernetes/pkg/volume/util/subpath"
)

// NewInitializedVolumePluginMgr returns a new instance of
// volume.VolumePluginMgr initialized with kubelets implementation of the
// volume.VolumeHost interface.
//
// kubelet - used by VolumeHost methods to expose kubelet specific parameters
// plugins - used to initialize volumePluginMgr
func NewInitializedVolumePluginMgr(
	kubelet *Kubelet,
	secretManager secret.Manager,
	configMapManager configmap.Manager,
	tokenManager *token.Manager,
	clusterTrustBundleManager clustertrustbundle.Manager,
	plugins []volume.VolumePlugin,
	prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
	//该函数用于创建并初始化一个volume.VolumePluginMgr的实例,
	//它接受多个参数,包括kubelet实例、secretManager、configMapManager、tokenManager、clusterTrustBundleManager、plugins和prober。
	//该函数返回一个初始化后的volume.VolumePluginMgr实例和一个错误值。

	// Initialize csiDriverLister before calling InitPlugins
	var informerFactory informers.SharedInformerFactory
	var csiDriverLister storagelisters.CSIDriverLister
	var csiDriversSynced cache.InformerSynced
	const resyncPeriod = 0
	// Don't initialize if kubeClient is nil
	if kubelet.kubeClient != nil {
		informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
		csiDriverInformer := informerFactory.Storage().V1().CSIDrivers()
		csiDriverLister = csiDriverInformer.Lister()
		csiDriversSynced = csiDriverInformer.Informer().HasSynced

	} else {
		klog.InfoS("KubeClient is nil. Skip initialization of CSIDriverLister")
	}
	//这段代码主要实现了在调用InitPlugins之前初始化csiDriverLister的功能。
	//它首先定义了一个informers.SharedInformerFactory,一个storagelisters.CSIDriverLister和一个cache.InformerSynced,
	//并将resyncPeriod设置为0。
	//然后,它检查kubelet.kubeClient是否为nil,
	//如果不为nil,则通过informers.NewSharedInformerFactory创建一个informers.SharedInformerFactory,
	//并从中获取csiDriverLister和csiDriversSynced。如果kubelet.kubeClient为nil,
	//则通过klog.InfoS打印一条日志,表示跳过CSIDriverLister的初始化。

	kvh := &kubeletVolumeHost{
		kubelet:                   kubelet,
		volumePluginMgr:           volume.VolumePluginMgr{},
		secretManager:             secretManager,
		configMapManager:          configMapManager,
		tokenManager:              tokenManager,
		clusterTrustBundleManager: clusterTrustBundleManager,
		informerFactory:           informerFactory,
		csiDriverLister:           csiDriverLister,
		csiDriversSynced:          csiDriversSynced,
		exec:                      utilexec.New(),
	}

	if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
		return nil, fmt.Errorf(
			"could not initialize volume plugins for KubeletVolumePluginMgr: %v",
			err)
	}

	return &kvh.volumePluginMgr, nil
}

//这段代码定义了一个kubeletVolumeHost结构体,并通过给其成员变量赋值进行初始化。
//接着,调用kvh.volumePluginMgr.InitPlugins方法初始化volumePluginMgr成员变量,如果初始化失败,则返回错误信息。
//最后返回volumePluginMgr成员变量和nil。

// Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface
var _ volume.VolumeHost = &kubeletVolumeHost{}
var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}

func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
	return kvh.kubelet.getPluginDir(pluginName)
}

type kubeletVolumeHost struct {
	kubelet                   *Kubelet
	volumePluginMgr           volume.VolumePluginMgr
	secretManager             secret.Manager
	tokenManager              *token.Manager
	configMapManager          configmap.Manager
	clusterTrustBundleManager clustertrustbundle.Manager
	informerFactory           informers.SharedInformerFactory
	csiDriverLister           storagelisters.CSIDriverLister
	csiDriversSynced          cache.InformerSynced
	exec                      utilexec.Interface
}

//这段代码定义了一个名为kubeletVolumeHost的结构体,它实现了volume.VolumeHost和volume.KubeletVolumeHost接口。
//其中,kubeletVolumeHost包含了一些字段,如kubelet、volumePluginMgr等,用于管理Kubernetes节点上的卷。
//函数GetPluginDir接收一个pluginName参数,并返回该插件的目录路径。
//这段代码还定义了一些类型和变量,如kubeletVolumeHost、secret.Manager等,用于支持Kubernetes卷的管理和操作。

func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
	kvh.kubelet.runtimeState.setStorageState(err)
}

func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
	return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
}

func (kvh *kubeletVolumeHost) GetPodsDir() string {
	return kvh.kubelet.getPodsDir()
}

func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
	dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
	if runtime.GOOS == "windows" {
		dir = util.GetWindowsPath(dir)
	}
	return dir
}

//- SetKubeletError:将错误信息设置到kubelet的运行时状态中。
//- GetVolumeDevicePluginDir:获取给定插件名称的卷设备插件目录。
//- GetPodsDir:获取pod目录。
//- GetPodVolumeDir:获取给定pod UID、插件名称和卷名称的卷目录。
//如果操作系统为Windows,则会使用util.GetWindowsPath函数处理目录路径。

func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
	return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
}

func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
	return kvh.kubelet.getPodPluginDir(podUID, pluginName)
}

func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
	return kvh.kubelet.kubeClient
}

func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
	return kvh.kubelet.subpather
}

func (kvh *kubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
	return kvh.kubelet.hostutil
}

func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
	return kvh.informerFactory
}

func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
	return kvh.csiDriverLister
}

func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
	return kvh.csiDriversSynced
}

//- GetPodVolumeDeviceDir:获取给定pod UID和插件名称的卷设备目录。'
//- GetPodPluginDir:获取给定pod UID和插件名称的插件目录。
//- GetKubeClient:获取kubelet的kubeClient接口,用于与Kubernetes API服务器通信。
//- GetSubpather:获取kubelet的subpather接口,用于处理挂载点的子路径。
//- GetHostUtil:获取kubelet的hostutil接口,提供与宿主机交互的功能。
//- GetInformerFactory:获取kubeletVolumeHost的informerFactory,用于创建和管理共享informer。
//- CSIDriverLister:返回csiDriverLister,用于列出CSI驱动。
//- CSIDriversSynced:返回一个函数,检查CSI驱动的缓存是否已同步。

// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
	if kvh.csiDriversSynced == nil {
		klog.ErrorS(nil, "CsiDriversSynced not found on KubeletVolumeHost")
		return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
	}

	synced := []cache.InformerSynced{kvh.csiDriversSynced}
	if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
		klog.InfoS("Failed to wait for cache sync for CSIDriverLister")
		return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
	}

	return nil
}

//该函数用于等待kubelet卷主机的CSI驱动程序列表器的缓存同步。
//它首先检查kvh.csiDriversSynced是否为nil,如果是,则记录错误并返回。
//然后,它将kvh.csiDriversSynced作为缓存同步函数的切片传递给cache.WaitForCacheSync函数,并检查是否成功等待缓存同步。
//如果等待失败,则记录信息并返回错误。如果成功等待缓存同步,则返回nil表示成功。

func (kvh *kubeletVolumeHost) NewWrapperMounter(
	volName string,
	spec volume.Spec,
	pod *v1.Pod,
	opts volume.VolumeOptions) (volume.Mounter, error) {
	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
	wrapperVolumeName := "wrapped_" + volName
	if spec.Volume != nil {
		spec.Volume.Name = wrapperVolumeName
	}

	return kvh.kubelet.newVolumeMounterFromPlugins(&spec, pod, opts)
}

//该函数是一个名为NewWrapperMounter的方法,它属于kubeletVolumeHost类型。该方法用于创建一个新的WrapperMounter实例。
//- 参数:
//- volName:表示卷的名称。
//- spec:表示卷的规范。
//- pod:表示Pod的实例。
//- opts:表示卷的选项。
//- 返回值:
//- volume.Mounter:表示挂载器接口。
//- error:表示错误信息。
//该方法首先将wrapperVolumeName设置为"wrapped_" + volName,即将卷的名称设置为"wrapped_"加上原始卷名称。
//然后,如果spec.Volume不为nil,则将spec.Volume.Name设置为wrapperVolumeName。
//最后,它通过调用kvh.kubelet.newVolumeMounterFromPlugins方法,使
//用提供的spec、pod和opts参数创建一个新的VolumeMounter实例,并将其与错误返回。

func (kvh *kubeletVolumeHost) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
	wrapperVolumeName := "wrapped_" + volName
	if spec.Volume != nil {
		spec.Volume.Name = wrapperVolumeName
	}

	plugin, err := kvh.kubelet.volumePluginMgr.FindPluginBySpec(&spec)
	if err != nil {
		return nil, err
	}

	return plugin.NewUnmounter(spec.Name(), podUID)
}

//该函数是一个方法,作用是创建一个unmounter对象用于卸载卷。
//首先,它通过将"wrapped_"前缀与传入的volName参数拼接,生成wrapper卷的名称。
//然后,如果spec.Volume不为空,将生成的wrapperVolumeName赋值给spec.Volume.Name。
//接下来,它通过调用kvh.kubelet.volumePluginMgr.FindPluginBySpec方法,根据spec参数找到对应的插件。
//最后,它调用找到的插件的NewUnmounter方法,传入spec.Name()和podUID参数,创建并返回一个Unmounter对象。
//如果在执行过程中出现错误,函数将返回nil和错误信息。

func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
	return kvh.kubelet.cloud
}

func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
	return kvh.kubelet.mounter
}

func (kvh *kubeletVolumeHost) GetHostName() string {
	return kvh.kubelet.hostname
}

func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
	hostIPs, err := kvh.kubelet.GetHostIPs()
	if err != nil {
		return nil, err
	}
	return hostIPs[0], err
}

//这段代码定义了kubeletVolumeHost类型的四个方法。
//- GetCloudProvider()方法返回kvh.kubelet.cloud,即kubelet的云提供商接口。
//- GetMounter(pluginName string)方法根据给定的插件名称返回kvh.kubelet.mounter,即kubelet的挂载接口。
//- GetHostName()方法返回kvh.kubelet.hostname,即kubelet的主机名。
//- GetHostIP()方法尝试获取kubelet的主机IP地址,并返回第一个IP地址和可能的错误。

func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
	node, err := kvh.kubelet.getNodeAnyWay()
	if err != nil {
		return nil, fmt.Errorf("error retrieving node: %v", err)
	}
	return node.Status.Allocatable, nil
}

func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
	if kvh.secretManager != nil {
		return kvh.secretManager.GetSecret
	}
	return func(namespace, name string) (*v1.Secret, error) {
		return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
	}
}

func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
	if kvh.configMapManager != nil {
		return kvh.configMapManager.GetConfigMap
	}
	return func(namespace, name string) (*v1.ConfigMap, error) {
		return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
	}
}

//### GetNodeAllocatable函数  该函数用于获取节点的可分配资源。
//- 首先,通过调用kvh.kubelet.getNodeAnyWay()方法获取节点信息。
//- 如果获取节点信息成功,则返回该节点的Status.Allocatable属性,否则返回错误。
//### GetSecretFunc函数  该函数用于获取一个用于获取Secret的函数。
//- 首先判断kvh.secretManager是否为空,如果不为空,则返回kvh.secretManager.GetSecret方法。
//- 如果为空,则返回一个匿名函数,该函数会返回错误信息"not supported due to running kubelet in standalone mode"。
//### GetConfigMapFunc函数  该函数用于获取一个用于获取ConfigMap的函数。
//- 首先判断kvh.configMapManager是否为空,如果不为空,则返回kvh.configMapManager.GetConfigMap方法。
//- 如果为空,则返回一个匿名函数,该函数会返回错误信息"not supported due to running kubelet in standalone mode"。

func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
	return kvh.tokenManager.GetServiceAccountToken
}

func (kvh *kubeletVolumeHost) DeleteServiceAccountTokenFunc() func(podUID types.UID) {
	return kvh.tokenManager.DeleteServiceAccountToken
}

func (kvh *kubeletVolumeHost) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
	return kvh.clusterTrustBundleManager.GetTrustAnchorsByName(name, allowMissing)
}

func (kvh *kubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
	return kvh.clusterTrustBundleManager.GetTrustAnchorsBySigner(signerName, labelSelector, allowMissing)
}

func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
	node, err := kvh.kubelet.GetNode()
	if err != nil {
		return nil, fmt.Errorf("error retrieving node: %v", err)
	}
	return node.Labels, nil
}

//- GetServiceAccountTokenFunc():返回一个函数,该函数用于获取ServiceAccount令牌。
//- DeleteServiceAccountTokenFunc():返回一个函数,该函数用于删除ServiceAccount令牌。
//- GetTrustAnchorsByName():根据名称获取集群信任锚点。
//- GetTrustAnchorsBySigner():根据签名者名称和标签选择器获取集群信任锚点。 - GetNodeLabels():获取节点的标签信息。

func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
	node, err := kvh.kubelet.GetNode()
	if err != nil {
		return nil, fmt.Errorf("error retrieving node: %v", err)
	}
	attachedVolumes := node.Status.VolumesAttached
	result := map[v1.UniqueVolumeName]string{}
	for i := range attachedVolumes {
		attachedVolume := attachedVolumes[i]
		result[attachedVolume.Name] = attachedVolume.DevicePath
	}
	return result, nil
}

func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
	return kvh.kubelet.nodeName
}

func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
	return kvh.kubelet.recorder
}

func (kvh *kubeletVolumeHost) GetExec(pluginName string) utilexec.Interface {
	return kvh.exec
}

举报

相关推荐

0 条评论