1. 前言
2. pluginapi
// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1
// constants.go
const (
Healthy = "Healthy"
Unhealthy = "Unhealthy"
Version = "v1beta1"
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
KubeletSocket = DevicePluginPath + "kubelet.sock"
KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}
// api.proto
service DevicePlugin {
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}
3. 启动device plugin
// k8s-device-plugin/main.go
func main() {
...
L:
for {
if restart {
...
devicePlugin = NewNvidiaDevicePlugin()
if err := devicePlugin.Serve(); err != nil {
...
} else {
restart = false
}
}
...
}
}
// k8s-device-plugin/server.go
var serverSock = pluginapi.DevicePluginPath + "nvidia.sock"
func NewNvidiaDevicePlugin() *NvidiaDevicePlugin {
return &NvidiaDevicePlugin{
devs: getDevices(),
socket: serverSock,
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}
// k8s-device-plugin/nvidia.go
func getDevices() []*pluginapi.Device {
n := uint(10)
var devs []*pluginapi.Device
for i := uint(0); i < n; i++ {
devs = append(devs, &pluginapi.Device{
ID: fmt.Sprintf("%v-%v", resourceName, i),
Health: pluginapi.Healthy,
})
}
return devs
}
Serve方法
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Serve() error {
// 启动当前服务 作为pluginapi的服务器端 供device manager调用
err := m.Start()
if err != nil {
log.Printf("Could not start device plugin: %s", err)
return err
}
log.Println("Starting to serve on", m.socket)
// 向kubelet发请求 其实就是向device manager发请求
err = m.Register(pluginapi.KubeletSocket, resourceName)
if err != nil {
log.Printf("Could not register device plugin: %s", err)
m.Stop()
return err
}
log.Println("Registered device plugin with Kubelet")
return nil
}
Start
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Start() error {
// 删除以前的服务器地址 因为要启动一个新的
// 就是删除m.socket=/var/lib/kubelet/device-plugins/nvidia.sock
err := m.cleanup()
if err != nil {
return err
}
// 启动服务 由于是本地进程间交流 所以用unix启动
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
// 注册m为pluginapi的服务器端
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
// goroutine方法启动
go m.server.Serve(sock)
// Wait for server to start by launching a blocking connexion
// 试一下有没有创建成功
conn, err := dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
go m.healthcheck()
return nil
}
Register
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
// kubeletEndpoint = /var/lib/kubelet/device-plugins/kubelet.sock
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
// 创建一个连接服务器(地址为kubeletEndpoint)的客户端 就是device manager
client := pluginapi.NewRegistrationClient(conn)
// 构造请求内容
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
// 向device manager注册信息
// 可以看到device manager也是pluginapi的服务器
// 此时的NvidiaDevicePlugin为客户端 实现了Register方法
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}
device-plugin 端总结
4. device manager 处理注册请求
启动 Start
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
klog.V(2).Infof("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
// Loads in allocatedDevices information from disk.
// 加载kubelet_internal-checkpoint
err := m.readCheckpoint()
if err != nil {
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
socketPath := filepath.Join(m.socketdir, m.socketname)
os.MkdirAll(m.socketdir, 0755)
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}
// 启动服务端 地址为/var/lib/kubelet/device-plugins/kubelet.sock
s, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf(errListenSocket+" %v", err)
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
// 注册自己为该服务器的服务
pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
return nil
}
Register方法
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
// 看看是不是支持的version v1beta1
var versionCompatible bool
for _, v := range pluginapi.SupportedVersions {
if r.Version == v {
versionCompatible = true
break
}
}
// 如果不支持该version 返回错误信息给客户端的device-plugin
if !versionCompatible {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
klog.Infof("Bad registration request from device plugin: %s", errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
go m.addEndpoint(r)
// 代表注册成功
return &pluginapi.Empty{}, nil
}
addEndpoint
Endpoint
type endpoint interface {
run()
stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
callback(resourceName string, devices []pluginapi.Device)
isStopped() bool
stopGracePeriodExpired() bool
}
type endpointImpl struct {
// 访问device plugin的客户端
client pluginapi.DevicePluginClient
clientConn *grpc.ClientConn
// device-plugin的地址
socketPath string
// device-plugin的资源名
resourceName string
// 停止的时间
stopTime time.Time
mutex sync.Mutex
// 回调函数
cb monitorCallback
}
newEndpointImpl
func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
// 生成一个客户端 可以访问地址为socketPat(/var/lib/kubelet/device-plugins/nvidia.sock)h的device plugin
client, c, err := dial(socketPath)
if err != nil {
klog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err
}
return &endpointImpl{
client: client,
clientConn: c,
socketPath: socketPath,
resourceName: resourceName,
cb: callback,
}, nil
}
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
...
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
...
return pluginapi.NewDevicePluginClient(c), c, nil
}
addEndpoint
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// 生成一个endpointImpl实例
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
if err != nil {
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
// 注册该endpoint
m.registerEndpoint(r.ResourceName, r.Options, new)
go func() {
// 运行该endpoint
m.runEndpoint(r.ResourceName, new)
}()
}
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version, // v1beta1
Endpoint: path.Base(m.socket), // /var/lib/kubelet/device-plugins/nvidia.sock
ResourceName: resourceName, // 资源名字
}
registerEndpoint
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
m.mutex.Lock()
defer m.mutex.Unlock()
// 保存到了一个map对象中
m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
klog.V(2).Infof("Registered endpoint %v", e)
}
runEndpoint
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
// 调用run方法 run方法是死循环
e.run()
e.stop()
m.mutex.Lock()
defer m.mutex.Unlock()
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
m.markResourceUnhealthy(resourceName)
}
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
// pkg/kubelet/cm/devicemanager/endpoint.go
func (e *endpointImpl) run() {
// 调用device plugin的ListAndWatch
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
for {
// 有变化的时候就会接受到信息
response, err := stream.Recv()
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}
// 然后调用endpoint的回调函数
e.callback(e.resourceName, newDevs)
}
}
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
genericDeviceUpdateCallback
// pkg/kubelet/cm/devicemanager/endpoint.go
func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) {
e.cb(resourceName, devices)
}
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
for _, dev := range devices {
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
m.writeCheckpoint()
}
总结
5. 整体总结