0
点赞
收藏
分享

微信扫一扫

【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册


文章目录

  • ​​写在前面​​
  • ​​1. 服务实例定义​​
  • ​​2. 服务实例注册​​
  • ​​3. 使用示例​​

写在前面

本文采用的 ETCD 版本库是 ​go.etcd.io/etcd/client/v3​​
采用的 gRPC 版本库是 ​​google.golang.org/grpc

在Go语言的RPC框架中,gRPC 是比较原生的,并没有集成 ETCD 服务发现的集成,需要我们去稍微封装一下。而像 micro 框架这种封装性比较好的就有集成 ETCD、consul 等等的服务发现功能,就直接调用就行了。

本文实战例子源码在 ​https://github.com/CocaineCong/gRPC-todoList

【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册_数据库

1. 服务实例定义

  • 定义我们所需要注入进ETCD的服务结构体

type Server struct {
Name string `json:"name"`
Addr string `json:"addr"` // 地址
Version string `json:"version"` // 版本
Weight int64 `json:"weight"` // 权重
}

Name:名字为服务的名字(用来进行服务的发现)
Addr:服务的地址(存储服务地址)
Version:服务的版本(方便服务的版本迭代)
Weight:服务的权重(后续用来降级熔断)

  • 定义服务名字前缀的函数

func BuildPrefix(server Server) string {
if server.Version == "" {
return fmt.Sprintf("/%s/", server.Name)
}
return fmt.Sprintf("/%s/%s/", server.Name, server.Version)
}

  • 定义注册的地址函数

func BuildRegisterPath(server Server) string {
return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)
}

  • 将值反序列化成一个注册 Server 服务

func ParseValue(value []byte) (Server, error) {
server := Server{}
if err := json.Unmarshal(value, &server); err != nil {
return server, err
}

return server, nil
}

  • 分割路径,后续用作 Server 地址的更新

func SplitPath(path string) (Server, error) {
server := Server{}
strs := strings.Split(path, "/")
if len(strs) == 0 {
return server, errors.New("invalid path")
}
server.Addr = strs[len(strs)-1]

return server, nil
}

  • 判断这个服务地址是否已经存在,防止服务访问冲突

func Exist(l []resolver.Address, addr resolver.Address) bool {
for i := range l {
if l[i].Addr == addr.Addr {
return true
}
}

return false
}

  • 移除服务

func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr.Addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}

2. 服务实例注册

  • 定义服务实例的实例,用来存储全部的实例信息,并且维持各个服务之间的执行,防止宕机等情况

type Register struct {
EtcdAddrs []string
DialTimeout int

closeCh chan struct{}
leasesID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

srvInfo Server
srvTTL int64
cli *clientv3.Client
logger *logrus.Logger
}

  • 创建一个注册对象

func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register {
return &Register{
EtcdAddrs: etcdAddrs,
DialTimeout: 3,
logger: logger,
}
}

  • 注册服务到 ETCD 中

func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
var err error

if strings.Split(srvInfo.Addr, ":")[0] == "" {
// 判断服务地址的正确性
return nil, errors.New("invalid ip address")
}
// 对服务进行注册
if r.cli, err = clientv3.New(clientv3.Config{
Endpoints: r.EtcdAddrs,
DialTimeout: time.Duration(r.DialTimeout) * time.Second,
}); err != nil {
return nil, err
}

r.srvInfo = srvInfo // 服务信息的注册
r.srvTTL = ttl // 服务的存活时间

if err = r.register(); err != nil {
return nil, err
}
// 初始化一个切片来判断这个服务连接是否关闭
r.closeCh = make(chan struct{})
// 异步进行心跳检测
go r.keepAlive()

return r.closeCh, nil
}

这里我们要先说明一个名词:租约

ETCD的 ​​Lease​​​ 租约,它类似 ​​TTL(Time To Live)​​,用于 etcd 客户端与服务端之间进行活性检测

在到达 TTL 时间之前,etcd 服务端不会删除相关租约上绑定的键值对;超过 TTL 时间,则会删除。因此我们需要在到达 TTL 时间之前续租,以实现客户端与服务端之间的保活。

func (r *Register) register() error {
//设置超时时间,访问etcd有超时控制
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
defer cancel()

// 注册一个新的租约
leaseResp, err := r.cli.Grant(ctx, r.srvTTL)
if err != nil {
return err
}

// 赋值租约的ID
r.leasesID = leaseResp.ID

// 对这个 cli 进行心跳检测
if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil {
return err
}

data, err := json.Marshal(r.srvInfo)
if err != nil {
return err
}
// 将服务写到 ETCD 中
_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))

return err
}

  • 关闭服务连接

func (r *Register) Stop() {
r.closeCh <- struct{}{}
}

  • 删除节点

func (r *Register) unregister() error {
_, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo))
return err
}

  • 存活检测

func (r *Register) keepAlive() {
ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)

for {
select {
case <-r.closeCh: // 是否存在这个服务
if err := r.unregister(); err != nil {
r.logger.Error("unregister failed, error: ", err)
}
// 撤销租约
if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
r.logger.Error("revoke failed, error: ", err)
}
case res := <-r.keepAliveCh:
if res == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
case <-ticker.C:
if r.keepAliveCh == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
}
}
}

  • 获取注册服务的信息

func (r *Register) GetServerInfo() (Server, error) {
resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo))
if err != nil {
return r.srvInfo, err
}

server := Server{}
if resp.Count >= 1 {
if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil {
return server, err
}
}

return server, err
}

3. 使用示例

  • 服务注册

etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())

  • 定义一个Node存放服务信息

userNode := discovery.Server{
Name: viper.GetString("server.domain"),
Addr: grpcAddress,
}

  • 注册

if _, err := etcdRegister.Register(userNode, 10); err != nil {
panic(fmt.Sprintf("start server failed, err: %v", err))
}


举报

相关推荐

0 条评论