0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][kubelet] devicemanager 之 device-plugin向kubelet注册

1. 前言

2. pluginapi

// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1

// constants.go
const (
    Healthy = "Healthy"
    Unhealthy = "Unhealthy"
    Version = "v1beta1"
    DevicePluginPath = "/var/lib/kubelet/device-plugins/"
    KubeletSocket = DevicePluginPath + "kubelet.sock"
    KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}

// api.proto
service DevicePlugin {
    rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
    rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
    rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
    rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

3. 启动device plugin

// k8s-device-plugin/main.go
func main() {
    ...
L:
    for {
        if restart {
            ...
            devicePlugin = NewNvidiaDevicePlugin()
            if err := devicePlugin.Serve(); err != nil {
                ...
            } else {
                restart = false
            }
        }
        ...
    }
}

// k8s-device-plugin/server.go
var serverSock             = pluginapi.DevicePluginPath + "nvidia.sock"
func NewNvidiaDevicePlugin() *NvidiaDevicePlugin {
    return &NvidiaDevicePlugin{
        devs:   getDevices(),
        socket: serverSock,
        stop:   make(chan interface{}),
        health: make(chan *pluginapi.Device),
    }
}

// k8s-device-plugin/nvidia.go
func getDevices() []*pluginapi.Device {
    n := uint(10)

    var devs []*pluginapi.Device
    for i := uint(0); i < n; i++ {
        devs = append(devs, &pluginapi.Device{
            ID:     fmt.Sprintf("%v-%v", resourceName, i),
            Health: pluginapi.Healthy,
        })
    }
    return devs
}
Serve方法
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Serve() error {
    // 启动当前服务 作为pluginapi的服务器端 供device manager调用
    err := m.Start()
    if err != nil {
        log.Printf("Could not start device plugin: %s", err)
        return err
    }
    log.Println("Starting to serve on", m.socket)
    // 向kubelet发请求 其实就是向device manager发请求
    err = m.Register(pluginapi.KubeletSocket, resourceName)
    if err != nil {
        log.Printf("Could not register device plugin: %s", err)
        m.Stop()
        return err
    }
    log.Println("Registered device plugin with Kubelet")

    return nil
}
Start
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Start() error {
    // 删除以前的服务器地址 因为要启动一个新的
    // 就是删除m.socket=/var/lib/kubelet/device-plugins/nvidia.sock
    err := m.cleanup()
    if err != nil {
        return err
    }

    // 启动服务 由于是本地进程间交流 所以用unix启动
    sock, err := net.Listen("unix", m.socket)
    if err != nil {
        return err
    }
    // 注册m为pluginapi的服务器端
    m.server = grpc.NewServer([]grpc.ServerOption{}...)
    pluginapi.RegisterDevicePluginServer(m.server, m)
    // goroutine方法启动
    go m.server.Serve(sock)

    // Wait for server to start by launching a blocking connexion
    // 试一下有没有创建成功
    conn, err := dial(m.socket, 5*time.Second)
    if err != nil {
        return err
    }
    conn.Close()
    go m.healthcheck()
    return nil
}
Register
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
    // kubeletEndpoint = /var/lib/kubelet/device-plugins/kubelet.sock
    conn, err := dial(kubeletEndpoint, 5*time.Second)
    if err != nil {
        return err
    }
    defer conn.Close()
    // 创建一个连接服务器(地址为kubeletEndpoint)的客户端 就是device manager
    client := pluginapi.NewRegistrationClient(conn)
    // 构造请求内容
    reqt := &pluginapi.RegisterRequest{
        Version:      pluginapi.Version,
        Endpoint:     path.Base(m.socket),
        ResourceName: resourceName,
    }
    // 向device manager注册信息
    // 可以看到device manager也是pluginapi的服务器
    // 此时的NvidiaDevicePlugin为客户端 实现了Register方法
    _, err = client.Register(context.Background(), reqt)
    if err != nil {
        return err
    }
    return nil
}
// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1
service Registration {
    rpc Register(RegisterRequest) returns (Empty) {}
}

device-plugin 端总结

4. device manager 处理注册请求

启动 Start

func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
    klog.V(2).Infof("Starting Device Plugin manager")

    m.activePods = activePods
    m.sourcesReady = sourcesReady

    // Loads in allocatedDevices information from disk.
    // 加载kubelet_internal-checkpoint
    err := m.readCheckpoint()
    if err != nil {
        klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
    }

    socketPath := filepath.Join(m.socketdir, m.socketname)
    os.MkdirAll(m.socketdir, 0755)

    // Removes all stale sockets in m.socketdir. Device plugins can monitor
    // this and use it as a signal to re-register with the new Kubelet.
    if err := m.removeContents(m.socketdir); err != nil {
        klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
    }
    // 启动服务端 地址为/var/lib/kubelet/device-plugins/kubelet.sock
    s, err := net.Listen("unix", socketPath)
    if err != nil {
        klog.Errorf(errListenSocket+" %v", err)
        return err
    }

    m.wg.Add(1)
    m.server = grpc.NewServer([]grpc.ServerOption{}...)
    // 注册自己为该服务器的服务
    pluginapi.RegisterRegistrationServer(m.server, m)
    go func() {
        defer m.wg.Done()
        m.server.Serve(s)
    }()
    klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
    return nil
}

Register方法

func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
    klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
    metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
    // 看看是不是支持的version v1beta1
    var versionCompatible bool
    for _, v := range pluginapi.SupportedVersions {
        if r.Version == v {
            versionCompatible = true
            break
        }
    }
    // 如果不支持该version 返回错误信息给客户端的device-plugin
    if !versionCompatible {
        errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
        klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
        return &pluginapi.Empty{}, fmt.Errorf(errorString)
    }
    
    if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
        errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
        klog.Infof("Bad registration request from device plugin: %s", errorString)
        return &pluginapi.Empty{}, fmt.Errorf(errorString)
    }
    go m.addEndpoint(r)
    // 代表注册成功
    return &pluginapi.Empty{}, nil
}
addEndpoint
Endpoint
type endpoint interface {
    run()
    stop()
    allocate(devs []string) (*pluginapi.AllocateResponse, error)
    preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
    callback(resourceName string, devices []pluginapi.Device)
    isStopped() bool
    stopGracePeriodExpired() bool
}

type endpointImpl struct {
    // 访问device plugin的客户端
    client     pluginapi.DevicePluginClient
    clientConn *grpc.ClientConn
    // device-plugin的地址
    socketPath   string
    // device-plugin的资源名
    resourceName string
    // 停止的时间
    stopTime     time.Time
    mutex sync.Mutex
    // 回调函数
    cb    monitorCallback
}
newEndpointImpl
func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
    // 生成一个客户端 可以访问地址为socketPat(/var/lib/kubelet/device-plugins/nvidia.sock)h的device plugin
    client, c, err := dial(socketPath)
    if err != nil {
        klog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
        return nil, err
    }

    return &endpointImpl{
        client:     client,
        clientConn: c,

        socketPath:   socketPath,
        resourceName: resourceName,

        cb: callback,
    }, nil
}
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
    ...
    c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
        ...
    return pluginapi.NewDevicePluginClient(c), c, nil
}
addEndpoint
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
    // 生成一个endpointImpl实例
    new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
    if err != nil {
        klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
        return
    }
    // 注册该endpoint
    m.registerEndpoint(r.ResourceName, r.Options, new)
    go func() {
        // 运行该endpoint
        m.runEndpoint(r.ResourceName, new)
    }()
}
reqt := &pluginapi.RegisterRequest{
        Version:      pluginapi.Version,  // v1beta1
        Endpoint:     path.Base(m.socket), // /var/lib/kubelet/device-plugins/nvidia.sock
        ResourceName: resourceName, // 资源名字
    }
registerEndpoint
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    // 保存到了一个map对象中
    m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
    klog.V(2).Infof("Registered endpoint %v", e)
}
runEndpoint
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
// 调用run方法 run方法是死循环
    e.run()
    e.stop()
    m.mutex.Lock()
    defer m.mutex.Unlock()
    if old, ok := m.endpoints[resourceName]; ok && old.e == e {
        m.markResourceUnhealthy(resourceName)
    }
    klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
// pkg/kubelet/cm/devicemanager/endpoint.go

func (e *endpointImpl) run() {
    // 调用device plugin的ListAndWatch
    stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
    if err != nil {
        klog.Errorf(errListAndWatch, e.resourceName, err)

        return
    }

    for {
        // 有变化的时候就会接受到信息
        response, err := stream.Recv()
        if err != nil {
            klog.Errorf(errListAndWatch, e.resourceName, err)
            return
        }

        devs := response.Devices
        klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)

        var newDevs []pluginapi.Device
        for _, d := range devs {
            newDevs = append(newDevs, *d)
        }
        // 然后调用endpoint的回调函数
        e.callback(e.resourceName, newDevs)
    }
}
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})

    for {
        select {
        case <-m.stop:
            return nil
        case d := <-m.health:
            // FIXME: there is no way to recover from the Unhealthy state.
            d.Health = pluginapi.Unhealthy
            s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
        }
    }
}
genericDeviceUpdateCallback
// pkg/kubelet/cm/devicemanager/endpoint.go
func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) {
    e.cb(resourceName, devices)
}
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
    m.mutex.Lock()
    m.healthyDevices[resourceName] = sets.NewString()
    m.unhealthyDevices[resourceName] = sets.NewString()
    for _, dev := range devices {
        if dev.Health == pluginapi.Healthy {
            m.healthyDevices[resourceName].Insert(dev.ID)
        } else {
            m.unhealthyDevices[resourceName].Insert(dev.ID)
        }
    }
    m.mutex.Unlock()
    m.writeCheckpoint()
}

总结

5. 整体总结

举报

相关推荐

0 条评论