0
点赞
收藏
分享

微信扫一扫

ETCD数据库源码分析——三大主流程启动流程


ETCD数据库源码分析——三大主流程启动流程_etcd


从ETCD数据库源码分析——初始化总览​中的StartEtcd函数用于启动EtcdServer,其调用的最后的代码就是​​e.Server.Start()​​,也就是server/etcdserver/server.go文件中的Start函数,代码如下所示:

// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
// Start must be non-blocking; any long-running server functionality
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorClusterVersions)
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorDowngrade)
}

raftNode模块中的start协程和etcdserver run协程都是由​​s.start()​​​启动的,而linearizableReadLoop协程是由​​s.GoAttach(s.linearizableReadLoop)​​启动。EtcdServer.start()方法会初始化EtcdServer实例中剩余的未初始化字段,然后启动后台协程来执行EtcdServer.run()方法(run方法是EtcdServer启动核心)。

if s.Cfg.SnapshotCount == 0 {
lg.Info("updating snapshot-count to default",zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),)
s.Cfg.SnapshotCount = DefaultSnapshotCount
}
if s.Cfg.SnapshotCatchUpEntries == 0 {
lg.Info("updating snapshot catch-up entries to default",zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{}, 1)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = notify.NewNotifier()
// TODO: if this is an empty log, writes all peer infos into the first entry
go s.run()

EtcdServer.run函数会先启动raftNode的start协程,并进入for循环使用select监听channel,​​s.r.apply()​​就是图中的applyc管道,如果有raft log要应用,就生成s.applyAll函数的包装函数,使用sched调度器Schedule调度执行。

func (s *EtcdServer) run() {
...
s.r.start(rh) // raftNode的start协程在这里启动
...
for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-getSyncC():
if s.v2store.HasTTLKeys() {
s.sync(s.Cfg.ReqTimeout())
}
case <-s.stop:
return
}
}
}

raft.Node的run协程在StartNode和RestartNode中执行(raft/node.go),而StartNode和RestartNode函数是在server/etcdserver/bootstrap.go文件中的newRaftNode函数中调用的。newRaftNode函数在server/etcdserver/server.go的NewServer函数中创建EtcdServer结构体时调用,并赋值给EtcdServer.r成员,也就是RaftNode类型的变量。

func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 { panic("no peers given; use RestartNode instead") }
rn, err := NewRawNode(c)
if err != nil { panic(err) }
err = rn.Bootstrap(peers)
if err != nil { c.Logger.Warningf("error occurred during starting a new node: %v", err) }
n := newNode(rn) // 新建raft.Node变量
go n.run()
return &n
}
func RestartNode(c *Config) Node {
rn, err := NewRawNode(c)
if err != nil { panic(err) }
n := newNode(rn) // 新建raft.Node变量
go n.run()
return &n
}
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
var n raft.Node
if len(b.peers) == 0 { n = raft.RestartNode(b.config)
} else { n = raft.StartNode(b.config, b.peers) }
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return newRaftNode( // RaftNode类型的变量
raftNodeConfig{lg: b.lg,isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: b.heartbeat, raftStorage: b.storage, storage: serverstorage.NewStorage(b.lg, wal, ss),
},
)
}


举报

相关推荐

0 条评论