1. 前言
2. kubelet_internal_checkpoint
{
"Data": {
"PodDeviceEntries": [
{
"PodUID": "pod1",
"ContainerName": "con1",
"ResourceName": "domain1.com/resource1",
"DeviceIDs": [
"dev1",
"dev2"
],
"AllocResp": "Eh0KDC9ob21lL3IxbGliMRILL3Vzci9yMWxpYjEYARofCgsvZGV2L3IxZGV2MRILL2Rldi9yMWRldjEaA21ydxofCgsvZGV2L3IxZGV2MhILL2Rldi9yMWRldjIaA21ydw=="
},
{
"PodUID": "pod2",
"ContainerName": "con1",
"ResourceName": "domain1.com/resource1",
"DeviceIDs": [
"dev4"
],
"AllocResp": "Eh0KDC9ob21lL3IxbGliMRILL3Vzci9yMWxpYjEYARofCgsvZGV2L3IxZGV2NBILL2Rldi9yMWRldjQaA21ydw=="
}
],
"RegisteredDevices": {
"domain1.com/resource1": [
"dev5",
"dev1",
"dev2",
"dev3",
"dev4"
],
"domain2.com/resource2": [
"dev1",
"dev2"
]
}
},
"Checksum": 3854436589
}
// pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
type checkpointData struct {
PodDeviceEntries []PodDevicesEntry
RegisteredDevices map[string][]string
}
type Data struct {
Data checkpointData
Checksum checksum.Checksum
}
// pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
}
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(cp.Data)
return json.Marshal(*cp)
}
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}
3. 结构
type deviceAllocateInfo struct {
// deviceIds contains device Ids allocated to this container for the given resourceName.
deviceIds sets.String
// allocResp contains cached rpc AllocateResponse.
allocResp *pluginapi.ContainerAllocateResponse
}
type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
type containerDevices map[string]resourceAllocateInfo // Keyed by containerName.
type podDevices map[string]containerDevices // Keyed by podUID.
4. 方法
insert, delete, pods, get
// 存入 重复的的会覆盖
func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
if _, podExists := pdev[podUID]; !podExists {
pdev[podUID] = make(containerDevices)
}
if _, contExists := pdev[podUID][contName]; !contExists {
pdev[podUID][contName] = make(resourceAllocateInfo)
}
pdev[podUID][contName][resource] = deviceAllocateInfo{
deviceIds: devices,
allocResp: resp,
}
}
// 删除
func (pdev podDevices) delete(pods []string) {
for _, uid := range pods {
delete(pdev, uid)
}
}
get操作
// 返回podUID-containerName-resoureName使用的设备
func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
if _, podExists := pdev[podUID]; !podExists {
return nil
}
if _, contExists := pdev[podUID][contName]; !contExists {
return nil
}
devs, resourceExists := pdev[podUID][contName][resource]
if !resourceExists {
return nil
}
return devs.deviceIds
}
// 返回所有的pods
func (pdev podDevices) pods() sets.String {
ret := sets.NewString()
for k := range pdev {
ret.Insert(k)
}
return ret
}
//返回所有使用的设备 根据resourceName来区分(不区分pod,container)
func (pdev podDevices) devices() map[string]sets.String {
ret := make(map[string]sets.String)
for _, containerDevices := range pdev {
for _, resources := range containerDevices {
for resource, devices := range resources {
if _, exists := ret[resource]; !exists {
ret[resource] = sets.NewString()
}
if devices.allocResp != nil {
ret[resource] = ret[resource].Union(devices.deviceIds)
}
}
}
}
return ret
}
// 返回podUID-containerName-resoureName使用的设备
func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
if _, podExists := pdev[podUID]; !podExists {
return nil
}
if _, contExists := pdev[podUID][contName]; !contExists {
return nil
}
devs, resourceExists := pdev[podUID][contName][resource]
if !resourceExists {
return nil
}
return devs.deviceIds
}
toCheckpointData 和 fromCheckpointData
// 将podDevices 转变成PodDevicesEntry
func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
var data []checkpoint.PodDevicesEntry
for podUID, containerDevices := range pdev {
for conName, resources := range containerDevices {
for resource, devices := range resources {
devIds := devices.deviceIds.UnsortedList()
if devices.allocResp == nil {
klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
continue
}
// 将allocResp marshal
allocResp, err := devices.allocResp.Marshal()
if err != nil {
klog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
continue
}
data = append(data, checkpoint.PodDevicesEntry{
PodUID: podUID,
ContainerName: conName,
ResourceName: resource,
DeviceIDs: devIds,
AllocResp: allocResp})
}
}
}
return data
}
// Populates podDevices from the passed in checkpointData.
func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
for _, entry := range data {
klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
devIDs := sets.NewString()
for _, devID := range entry.DeviceIDs {
devIDs.Insert(devID)
}
allocResp := &pluginapi.ContainerAllocateResponse{}
err := allocResp.Unmarshal(entry.AllocResp)
if err != nil {
klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err)
continue
}
// 相当于每个数据再重新插入一次
pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp)
}
}
deviceRunContainerOptions
// 将某个pod中的某个容器的所有资源信息组合到一起
// 因为某个pod中的某个容器很可能使用了很多资源, 每个资源的allocResp中包含了env, mount等等信息
// 该方法就是将这个容器的每个资源的 env全部放一起 mount全部放一起 Annotations全部放一起等等
// 组成一个新的结构DeviceRunContainerOptions
func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
containers, exists := pdev[podUID]
if !exists {
return nil
}
resources, exists := containers[contName]
if !exists {
return nil
}
opts := &DeviceRunContainerOptions{}
// Maps to detect duplicate settings.
devsMap := make(map[string]string)
mountsMap := make(map[string]string)
envsMap := make(map[string]string)
annotationsMap := make(map[string]string)
// Loops through AllocationResponses of all cached device resources.
for _, devices := range resources {
resp := devices.allocResp
for k, v := range resp.Envs {
if e, ok := envsMap[k]; ok {
klog.V(4).Infof("Skip existing env %s %s", k, v)
if e != v {
klog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
}
continue
}
klog.V(4).Infof("Add env %s %s", k, v)
envsMap[k] = v
opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
}
...
}
return opts
}
// pkg/kubelet/cm/devicemanager/types.go
type DeviceRunContainerOptions struct {
// The environment variables list.
Envs []kubecontainer.EnvVar
// The mounts for the container.
Mounts []kubecontainer.Mount
// The host devices mapped into the container.
Devices []kubecontainer.DeviceInfo
// The Annotations for the container
Annotations []kubecontainer.Annotation
}