0
点赞
收藏
分享

微信扫一扫

influxdb源码解析-Inmem Index

前言

这是一个分析inlfuxdb源码的系列。在上一章分析了series 模块。对于series,influxdb提供了series file来作为series 逻辑的统一封装。series 作为influxdb 里面的一个重要模块,被很多其他模块依赖。本章内容分析的是influxdb 的index模块。

  • influxdb安装和使用
  • influxdb概念详解1
  • influxdb概念详解2
  • influxdb源码编译
  • influxdb启动分析
  • influxdb源码分析-meta部分
  • infludb源码分析-数据写入
  • influxdb数据写入细节
  • influxdb源码分析-series

Index的介绍

如果上来就开始介绍索引,可能有些唐突。因为毕竟在这之前都没有提到过索引的概念。索引(Index)是数据库中的一个非常重要的模块,几乎所有数据库都会有索引。索引的作用就是加速查询。在influxdb 中也不例外。还是先看一下influxdb 的索引位于那个模块下,或者被什么模块依赖了。

Store

在之前的介绍中,提到了这个模块。Store在influxdb中是存储的抽象。TSDBStore 是influxdb 存储模块。这个结构在tsdb/store.go 下面。

type Store struct {
mu sync.RWMutex
shards map[uint64]*Shard
databases map[string]*databaseState
sfiles map[string]*SeriesFile
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
path string

// shared per-database indexes, only if using "inmem".
indexes map[string]interface{}

// Maintains a set of shards that are in the process of deletion.
// This prevents new shards from being created while old ones are being deleted.
pendingShardDeletes map[uint64]struct{}

// Epoch tracker helps serialize writes and deletes that may conflict. It
// is stored by shard.
epochs map[uint64]*epochTracker

EngineOptions EngineOptions

baseLogger *zap.Logger
Logger *zap.Logger

closing chan struct{}
wg sync.WaitGroup
opened bool
}

store 中有多个shard结构,这个结构之前也简单提到过,每个shard就是对应的influxdb一个存储的实际分片。每个database下面有多个shard,具体数量取决于retention policy。这里就不在细讲,不太清楚的可以去回顾一下meta部分,那里面详细的介绍了。

shard在influxdb里面是真正存储数据的结构。一个shard可以理解为是一个TSM 引擎,管理多个tsm file。这个后面会展开说说,首先看一下shard的具体组成:

type Shard struct {
path string
walPath string
id uint64

database string
retentionPolicy string

sfile *SeriesFile
options EngineOptions

mu sync.RWMutex
_engine Engine
index Index
enabled bool

// expvar-based stats.
stats *ShardStatistics
defaultTags models.StatisticTags

baseLogger *zap.Logger
logger *zap.Logger

EnableOnOpen bool

// CompactionDisabled specifies the shard should not schedule compactions.
// This option is intended for offline tooling.
CompactionDisabled bool
}

在这个结构里面可以看到 shard里面主要有以下几个结构:

  • path和wal path。path 指的是tsm file所在的位置,wal path指的是wal所在的位置。
  • database。shard所属的database
  • sfile。注意series 是database的概念的,也就是一个database下面所有的shard是共享这个数据结构的。这一点在存储的结构上也能看出来。
  • engine。tsm engine,负责存储的主要功能。
  • index 索引。

其他的就不再详细说了。这里就发现了今天的主角:Index .这里可以看出来,Index是被Shard依赖了的一个数据结构,主要帮助Shard 快速查询数据。

Index 整体设计

influxdb index模块有一个顶层的interface,定义了index都有哪些行为。在tsdb/index.go 模块下的Index结构中。这里我简化一下具体的结构:

type Index interface {
Open() error
Database() string
MeasurementExists(name []byte) (bool, error)
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error
DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)

HasTagKey(name, key []byte) (bool, error)
HasTagValue(name, key, value []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
TagKeyCardinality(name, key []byte) int

// InfluxQL system iterators
MeasurementIterator() (MeasurementIterator, error)
TagKeyIterator(name []byte) (TagKeyIterator, error)
TagValueIterator(name, key []byte) (TagValueIterator, error)
MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)

// Sets a shared fieldset from the engine.
FieldSet() *MeasurementFieldSet
SetFieldSet(fs *MeasurementFieldSet)

// Size of the index on disk, if applicable.
DiskSizeBytes() int64
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)

Type() string
// Returns a unique reference ID to the index instance.
// For inmem, returns a reference to the backing Index, not ShardIndex.
UniqueReferenceID() uintptr

Rebuild()
}

简化之后的结构大致如上,可以从顶层的定义中看到index的大致功能。

  • 提供一些基本数据信息的查询,注意,这里可不是meta info。这里指的是measurement,series,tagk,tagv等。
  • influxQL 的支持。
  • field set shared
  • Type()和Rebuild()

其中前三个都很好理解,第四个是Type和Rebuild。这个单独拿出来了,这里说明,index是支持多种类型的,Type()函数正是标注了具体的类型。

Index的注册和获取

上面提到了顶层的Index模块提供了Type函数来标注当前实现的Index类型。这也是influxdb 可以拓展的地方,如果我们要自己开发一个Index,那么只需要实现这些接口。

在顶层的index模块中,提供了Register函数,来注册对应的Index,具体是怎么做的呢?

// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index

// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)

// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}

这段代码展示了具体的注册逻辑。首先是NewIndexFunc 这是Index的构造函数,所有的Index在注册的时候只需要调用RegisterIndex注册一个NewIndexFunc的实现,这个实现会被保存的map里面。那么Shard是怎么使用的呢?

在influxdb启动的时候,会load 所有的shard,并且新建里面的所有数据结构。在NewShard的时候,会新建Shard相关的索引。

func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) {
// 这里默认使用的是inmem index
format := options.IndexVersion

// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = TSI1IndexName
}

// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, database, path, seriesIDSet, sfile, options), nil
}

在启动的时候,会指定EngineOptions,这里会指明索引的类型format。然后使用查找这个类型的NewIndexFunc函数,调用这个函数来把索引构建起来。

到这里我们已经基本知道了这样几个事实:

  • 索引Index是有顶层的抽象接口的,所有具体实现需要实现这个接口。
  • 具体的实现,需要主动调用顶层接口提供的Register函数,注册一个符合当前实现的NewIndexFunc到map中。
  • Shard启动时,会从启动参数里面选择具体的索引类型,并且找到这个索引注册的NewIndexFunc来构建索引。

那么,接下来就看一下。具体的实现。

Inmem Index

influxdb 提供了两种索引的实现,在tsdb/index 模块下。这两种索引分别是Inmem Index和tsi Index。上面说到了,索引的具体的实现需要自己注册,这两种索引在init函数里面实现了注册逻辑。

Inmem Index:

// IndexName is the name of this index.
const IndexName = tsdb.InmemIndexName

func init() {
tsdb.NewInmemIndex = func(name string, sfile *tsdb.SeriesFile) (interface{}, error) { return NewIndex(name, sfile), nil }

tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
return NewShardIndex(id, seriesIDSet, opt)
})
}

tsi index:

// IndexName is the name of the index.
const IndexName = tsdb.TSI1IndexName

// ErrCompactionInterrupted is returned if compactions are disabled or
// an index is closed while a compaction is occurring.
var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted")

func init() {
if os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS") != "" {
i, err := strconv.Atoi(os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS"))
if err != nil {
panic(err)
}
DefaultPartitionN = uint64(i)
}

tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
idx := NewIndex(sfile, db,
WithPath(path),
WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)),
WithSeriesIDCacheSize(opt.Config.SeriesIDSetCacheSize),
)
return idx
})
}

这一小节首先看一下Inmem index.

Inmem index结构

上面Index只是定义了所有Index 实现的行为,但是成员还是需要实现自己定义。Inmem Index的结构:

// un-exported functions assume the caller will use the appropriate locks.
type Index struct {
mu sync.RWMutex

database string
sfile *tsdb.SeriesFile
fieldset *tsdb.MeasurementFieldSet

// In-memory metadata index, built on load and updated when new series come in
measurements map[string]*measurement // measurement name to object and index
series map[string]*series // map series key to the Series object

seriesSketch, seriesTSSketch estimator.Sketch
measurementsSketch, measurementsTSSketch estimator.Sketch

// Mutex to control rebuilds of the index
rebuildQueue sync.Mutex
}

这个结构还是很简洁的,主要是series file和field,measurement信息。后面两个Sketch是用来评估数量的,可以不管。先看一下NewIndex方法

func NewIndex(database string, sfile *tsdb.SeriesFile) *Index {
index := &Index{
database: database,
sfile: sfile,
measurements: make(map[string]*measurement),
series: make(map[string]*series),
}

index.seriesSketch = hll.NewDefaultPlus()
index.seriesTSSketch = hll.NewDefaultPlus()
index.measurementsSketch = hll.NewDefaultPlus()
index.measurementsTSSketch = hll.NewDefaultPlus()

return index
}

NewIndex只传入两个参数,database和series file。其他的都是新建的。这里多提一句,注意看那几个Sketch,都是hll 的默认实现。这里是通过HyperLogLog 算法来实现的基数统计,不太了解的可以去看看这个算法。

CreateSeriesListIfNotExists

为啥把这个函数单独拿出来说说呢,因为这个函数很重要,贯穿全局。不是很了解的,去要去看一下上一章:series部分。看一下inmem index这个函数式怎么实现的。这里的逻辑简化一下,有点长。

func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measurements map[string]int,
keys, names [][]byte, tagsSlice []models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error {
seriesIDs, err := i.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
if err != nil {
return err
}
i.mu.RLock()
seriesList := make([]*series, len(seriesIDs))
for j, key := range keys {
seriesList[j] = i.series[string(key)]
}
i.mu.RUnlock()
var hasNewSeries bool
for _, ss := range seriesList {
if ss == nil {
hasNewSeries = true
continue
}
}
if !hasNewSeries {
return nil
}
var newSeriesN int
for j, key := range keys {
if seriesList[j] != nil {
continue
}

ss := i.series[string(key)]
if ss == nil {
newSeriesN++
continue
}
seriesList[j] = ss
}
if newSeriesN == 0 {
return nil
}
for j, key := range keys {
if seriesList[j] != nil || i.series[string(key)] != nil {
continue
}
skey := string(key)
ss := newSeries(seriesIDs[j], mms[j], skey, tagsSlice[j].Clone())
i.series[skey] = ss
mms[j].AddSeries(ss)
}
return nil
}

这里省略的比较多,主要是measurement和加锁的逻辑,因为index是shard 共享的,所以需要加锁。这段代码的核心逻辑也是很清晰的,首先把create new series的任务委托给了SeriesFile,然后拿到seriesId,check新增的series,如果有新增的,那么添加到当前的结构里面。 这里相当于对series 做了一个cache 这段逻辑大致就是如此,也通过这个例子,说明了inmem 的作用。

CreateMeasurementIndexIfNotExists

上面看到inmem index的主要结构是series file和measurement。CreateMeasurementIndexIfNotExists是创建index对应的memsuremnet结构。measurement相关的信息不像series可以有其他模块提供。series 信息在启动的时候series file会加载存储的series 信息。measurement作为一个单独的模块,在重建索引时,会初始化。

Rebuild

rebuild是重建索引。在刚启动时,需要重建索引。
func (i *Index) Rebuild() {
// Only allow one rebuild at a time. This will cause all subsequent rebuilds
// to queue. The measurement rebuild is idempotent and will not be rebuilt if
// it does not need to be.
i.rebuildQueue.Lock()
defer i.rebuildQueue.Unlock()

i.ForEachMeasurementName(func(name []byte) error {
// Measurement never returns an error
m, _ := i.Measurement(name)
if m == nil {
return nil
}

i.mu.Lock()
nm := m.Rebuild()

i.measurements[string(name)] = nm
i.mu.Unlock()
return nil
})
}

rebuild index时,会初始化measurement相关信息。

总结

本篇文章介绍了influxdb index模块的基本结构和inmem 实现。 inmem 的index是启动默认的,比较简单,不支持持久化。为了解决这个问题,influxdb在tsi index里面,对index做了非常详细的设计,支持持久化,倒排索引等。下一节看一下这个。


举报

相关推荐

0 条评论