0
点赞
收藏
分享

微信扫一扫

ETCD数据库源码分析——ServerV2接口实现


ETCD数据库源码分析——ServerV2接口实现_初始化

我们知道EtcdServer实现了Server接口、ServerV2接口和ServerV3接口,这里我们首先介绍一下ServerV2接口(ServerV2接口是用于操作v2 store的API)。如上图中的ServerV2代码所示,其提供了Leader()、Do()和ClientCertAuthEnabled()函数定义。EtcdServer结构体需要实现ServerV2接口,以提供操作v2 store的能力。

Do函数

如下所示是Do函数的实现,首先使用reqIDGen产生一个请求id,设置到pb.Request的ID成员中;然后初始化reqV2HandlerEtcdServer结构体(上图右侧部分的结构体);然后调用Request(RequestV2被定义为Request)提供的Handle函数;最后将节点当前任期和commit raft log pos设置到响应中。

func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
r.ID = s.reqIDGen.Next()
h := &reqV2HandlerEtcdServer{
reqV2HandlerStore: reqV2HandlerStore{ store: s.v2store, applier: s.applyV2, }, s: s,
}
rp := &r
resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
resp.Term, resp.Index = s.Term(), s.CommittedIndex()
return resp, err
}
func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) {
if r.Method == "GET" && r.Quorum { r.Method = "QGET" }
switch r.Method {
case "POST": return v2api.Post(ctx, r)
case "PUT": return v2api.Put(ctx, r)
case "DELETE": return v2api.Delete(ctx, r)
case "QGET": return v2api.QGet(ctx, r)
case "GET": return v2api.Get(ctx, r)
case "HEAD": return v2api.Head(ctx, r)
}
return Response{}, errors.ErrUnknownMethod
}

Handle函数 对Request进行解析,根据 Request.Method 等字段执行对应的操作。 如果 Request.Method 是“POST”、“PUT”、“DELETE”或 Quorum为true 的“QGET”,则Request 将在执行其相应操作之前通过共识发送,Do函数将阻塞,直到执行操作或出现错误。

reqV2HandlerEtcdServer

reqV2HandlerEtcdServer结构体包含了reqV2HandlerStore和EtcdServer两个成员,reqV2HandlerStore包含了v2store.Store和ApplierV2两个部分,从上述Do函数可以看到,这两个成员就是EtcdServer的v2store和applyv2,也就是后续reqV2HandlerStore结构体实现的接口函数,其实就是对EtcdServer的v2store和applyv2这两个成员的操作。

type reqV2HandlerEtcdServer struct {
reqV2HandlerStore
s *EtcdServer
}

reqV2HandlerEtcdServer结构体实现了RequestV2Handler接口的所有接口函数。如下可以看出这些Post、Put、Delete和QGet方法都是通过调用processRaftRequest实现的,在processRaftRequest方法中会将请求交给etcd-raft模块进行处理,并且阻塞等待请求处理结束。

type RequestV2Handler interface {
Post(ctx context.Context, r *RequestV2) (Response, error)
Put(ctx context.Context, r *RequestV2) (Response, error)
Delete(ctx context.Context, r *RequestV2) (Response, error)
QGet(ctx context.Context, r *RequestV2) (Response, error)
Get(ctx context.Context, r *RequestV2) (Response, error)
Head(ctx context.Context, r *RequestV2) (Response, error)
}
func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) { return a.processRaftRequest(ctx, r) }
func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) { return a.processRaftRequest(ctx, r) }
func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) { return a.processRaftRequest(ctx, r) }
func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) { return a.processRaftRequest(ctx, r) }
func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) {
data, err := ((*pb.Request)(r)).Marshal()
if err != nil { return Response{}, err }
ch := a.s.w.Register(r.ID)
start := time.Now()
a.s.r.Propose(ctx, data)
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.Err
case <-ctx.Done():
proposalsFailed.Inc()
a.s.w.Trigger(r.ID, nil) // GC wait
return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
case <-a.s.stopping:
}
return Response{}, errors.ErrStopped
}

reqV2HandlerStore

type reqV2HandlerStore struct {
store v2store.Store
applier ApplierV2
}

reqV2HandlerStore结构体实现了RequestV2Handler接口的所有接口函数。从reqV2HandlerStore对这些函数的实现可以看出Post、Put、Delete、QGet其实都是调用的ApplierV2的相应函数;Get和Head调用的是v2store.Store相应的接口函数,Do函数针对Request.Method为Get和Head的操作就是调用如下的Get和Head函数,执行v2store.Store相应的接口函数获取数据

func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) { return a.applier.Post(r), nil }
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) { return a.applier.Put(r, membership.ApplyBoth), nil }
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) { return a.applier.Delete(r), nil }
func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) { return a.applier.QGet(r), nil }

func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) {
if r.Wait {
wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
return Response{Watcher: wc}, err
}
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}
func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) {
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}

去掉v2sotre,这里需要删掉ServerV2的Do函数实现和updateClusterVersionV2函数,删除server/etcdserver/apply.v2.go和server/etcdserver/v2.server.go文件。


举报

相关推荐

0 条评论