Etcd 相关参考资料
Etcd 的介绍与使用:Etcd 介绍与使用(入门篇)-CSDN博客
Etcd Raft 协议:Etcd Raft 协议(进阶篇)-CSDN博客
本文诣在使用 Go 客户端操作 Etcd,并实现元数据的写入(单条写、批量写)、读取(单挑读、前缀读)、监听(watch)、更新(update)!
注:数据是以 Protobuf 格式存储到 Etcd 的!
Protobuf 的介绍与使用:Protobuf 的介绍与使用(入门级)-CSDN博客
完整代码
./src/message/metadata.proto
syntax = "proto3";
package message;
option go_package = "/data/etcd_test/src/message";
message Metadata {
string Name = 1;
int64 ShardCount = 2;
}
./src/etcd/main.go
package main
import (
"errors"
"strings"
//"encoding/json"
"etcd_test/src/message"
"fmt"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/client/v3"
"golang.org/x/net/context"
"time"
)
// 连接etcd集群
func connectToEtcd(args ...string) (*clientv3.Client, error) {
if len(args) == 0 {
return nil, errors.New("ip or port is nil")
}
var endpoints []string
for _, arg := range args {
endpoints = append(endpoints, arg)
}
// 连接etcd集群
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, errors.New("connect etcd fail")
}
return cli, nil
}
// 初始化元数据
func initMetadata() (map[string]*message.Metadata, error) {
// 初始化一个切片用于存储元数据
metadata := make(map[string]*message.Metadata)
// 初始化meta_tables实例
data1 := &message.Metadata{Name: "dist_by_mm", ShardCount: 12}
keyName1 := "meta_table"
metadata[keyName1] = data1
data2 := &message.Metadata{Name: "dist_by_mm", ShardCount: 14}
keyName2 := "meta_table1"
metadata[keyName2] = data2
if len(metadata) == 0 {
return nil, errors.New("metadata init error")
}
return metadata, nil
}
// 将元数据序列化为protobuf并存储到etcd(单条循环写入)
func putMetadataToEtcdSingle(cli *clientv3.Client, metadata map[string]*message.Metadata) error {
if len(metadata) == 0 {
return errors.New("metadata list is nil")
}
// 遍历元数据列表,逐个写入etcd
for k, v := range metadata {
// 将元数据序列化为protobuf
protoData, err := proto.Marshal(v)
if err != nil {
return errors.New("metadata to protobuf fail")
}
// 存储protobuf数据到etcd
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = cli.Put(ctx, k, string(protoData))
if err != nil {
return errors.New("metadata write fail")
}
}
return nil
}
// 将元数据序列化为protobuf并存储到etcd(批量写入)
func putMetadataToEtcdBatch(cli *clientv3.Client, metadata map[string]*message.Metadata) error {
// 创建事务
txn := cli.Txn(context.Background())
meta := make(map[string]string)
for k, v := range metadata {
// 将元数据序列化为protobuf
protoData, err := proto.Marshal(v)
if err != nil {
return errors.New("metadata to protobuf fail")
}
meta[k] = string(protoData)
}
// 组装多个写入操作
txn.If(clientv3.Compare(clientv3.CreateRevision("meta_table"), "=", 0)).
// 此处不允许相同的键重复写入
Then(
clientv3.OpPut("meta_table", meta["meta_table"]),
clientv3.OpPut("meta_table1", meta["meta_table1"]),
)
// 提交事务
resp, err := txn.Commit()
if err != nil {
// handle error!
fmt.Printf("txn commit failed, err: %v\n", err)
return nil
}
// 检查事务结果
if !resp.Succeeded {
fmt.Println("txn failed, some keys already exist")
} else {
fmt.Println("txn succeeded")
}
return nil
}
func getMetadataFromEtcd(cli *clientv3.Client, metadataMap map[string]*message.Metadata, args ...interface{}) error {
if len(args) == 0 {
return errors.New("not key need read")
}
// 遍历key并从etcd中读取对应的key-val
for _, arg := range args {
switch v := arg.(type) {
case map[string]*message.Metadata: // 以map的形式指定key(即读取刚写入的数据)
{
// map为空
if len(v) == 0 {
return errors.New("key map is nil")
} else {
for k, _ := range v {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
meta, err := cli.Get(ctx, k)
cancel()
if err != nil {
return errors.New("read matadata fail")
}
if len(meta.Kvs) == 0 {
return errors.New("no key-value found for the given key")
}
// 把protobuf解析为Metadata
var retrievedMetadata message.Metadata
err = proto.Unmarshal(meta.Kvs[0].Value, &retrievedMetadata)
if err != nil {
return errors.New("protobuf to Metadata fail")
}
metadataMap[k] = &retrievedMetadata
}
}
break
}
case string: // 单个读或批量读
{
if strings.Contains(v, "prefix") { // prefix读
str := strings.Split(v, ":")
key := str[0]
// 创建事务
txn := cli.Txn(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
meta, err := cli.Get(ctx, key, clientv3.WithPrefix())
cancel()
if err != nil {
return errors.New("read matadata error")
}
if len(meta.Kvs) == 0 {
return errors.New("no key-value found for the given key")
}
for _, m := range meta.Kvs {
// 把protobuf解析为Metadata
var val message.Metadata
err = proto.Unmarshal(m.Value, &val)
if err != nil {
return errors.New("protobuf to Metadata fail")
}
metadataMap[string(m.Key)] = &val
}
// 提交事务
resp, err := txn.Commit()
if err != nil {
return errors.New("txn commit failed")
}
// 检查事务结果
if !resp.Succeeded {
return errors.New("txn failed, some keys already exist")
} else {
return errors.New("txn succeeded")
}
} else { // 单条读
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
meta, err := cli.Get(ctx, v)
cancel()
if err != nil {
return errors.New("read matadata error")
}
if len(meta.Kvs) == 0 {
return errors.New("no key-value found for the given key")
}
var val message.Metadata
err = proto.Unmarshal(meta.Kvs[0].Value, &val)
if err != nil {
return errors.New("protobuf to Metadata fail")
}
metadataMap[string(meta.Kvs[0].Key)] = &val
}
break
}
default:
return errors.New("args is unknown data type")
}
}
return nil
}
// 监听etcd上指定key的变化,如果发生变化则更新缓存中的元数据
func watchEtcdAndUpdate(cli *clientv3.Client, metadataMap map[string]*message.Metadata, key string) error {
if key == "" {
return errors.New("not key need watch")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watch := cli.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV())
for w := range watch {
for _, ev := range w.Events {
fmt.Printf("Type: %s Key: %s Value: %s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// 根据变化修改缓存中的元数据
switch ev.Type {
case clientv3.EventTypePut:
{
var val message.Metadata
err := proto.Unmarshal(ev.Kv.Value, &val)
if err != nil {
return errors.New("protobuf to Metadata fail")
}
metadataMap[string(ev.Kv.Key)] = &val
for k, v := range metadataMap {
fmt.Println("tableName:", k)
fmt.Println("metadata ShardCount:", v.GetShardCount())
}
break
}
case clientv3.EventTypeDelete:
{
if ev.PrevKv.Key != nil {
delete(metadataMap, string(ev.Kv.Key))
}
break
}
default:
return errors.New("watch error")
}
}
}
return nil
}
func main() {
// 初始化一个缓存,模拟其它kingproxy节点
metadataMap := make(map[string]*message.Metadata)
// 连接etcd集群
cli, err := connectToEtcd("120.92.144.250:2379")
if err != nil {
fmt.Println(err)
return
}
defer cli.Close()
// 初始化元数据
metadata, err := initMetadata()
if err != nil {
fmt.Println(err)
return
}
// 写入元数据
//err = putMetadataToEtcdSingle(cli, metadata)
//if err != nil {
// fmt.Println(err)
// return
//}
// 批量写入元数据
err = putMetadataToEtcdBatch(cli, metadata)
if err != nil {
fmt.Println(err)
return
}
// 读取元数据
getMetadataFromEtcd(cli, metadataMap, "meta_table:prefix")
// pre result
for k, v := range metadataMap {
fmt.Println("tableName:", k)
fmt.Println("metadata ShardCount:", v.GetShardCount())
}
fmt.Println("--------------------------------------------------------------------------------")
// watch key
//watchEtcdAndUpdate(cli, metadataMap, "meta_table")
return
}
完整 Go 工程
完整 Go 工程获取:https://download.csdn.net/download/weixin_47156401/89019290