0
点赞
收藏
分享

微信扫一扫

parca 对于frostdb 的使用简单说明

parca 使用badger 存储元数据,使用frostdb 列存,存储采样数据

简单集成使用说明

frostdb 官方文档提供了一些简单的集成demo,值得学习参考
参考代码pkg/parca/parca.go ,pkg/scrape/manager.go,pkg/scrape/scrape.go 如果需要开启持久化的就需要使用对象存储了,frostdb 支持对象存储进行持久化
因为parca 很多地方参考了prometheus,所以对于frostdb的集成是包装为了一个prometheuts 服务

  • 初始化
    pkg/parca/parca.go 以及pkg/scrape/manager.go
    初始化

 

col, err := frostdb.New(frostdbOptions...)

if err != nil {

level.Error(logger).Log("msg", "failed to initialize storage", "err", err)

return err

}

 

if err := col.ReplayWALs(context.Background()); err != nil {

level.Error(logger).Log("msg", "failed to replay WAL", "err", err)

return err

}

 

colDB, err := col.DB(ctx, "parca")

if err != nil {

level.Error(logger).Log("msg", "failed to load database", "err", err)

return err

}

 

schema, err := parcacol.Schema()

if err != nil {

level.Error(logger).Log("msg", "failed to get schema", "err", err)

return err

}

 

table, err := colDB.Table("stacktraces", frostdb.NewTableConfig(schema))

if err != nil {

level.Error(logger).Log("msg", "create table", "err", err)

return err

}

 

s := profilestore.NewProfileColumnStore(

logger,

tracerProvider.Tracer("profilestore"),

metastore,

table,

schema,

flags.StorageDebugValueLog,

)

使用Manager 中使用

func (m *Manager) reload() {

m.mtxScrape.Lock()

defer m.mtxScrape.Unlock()

var wg sync.WaitGroup

level.Debug(m.logger).Log("msg", "Reloading scrape manager")

for setName, groups := range m.targetSets {

var sp *scrapePool

existing, ok := m.scrapePools[setName]

if !ok {

scrapeConfig, ok := m.scrapeConfigs[setName]

if !ok {

level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)

return

}

// 此方法是核心,进行pull 模式需要依赖列存,具体数据写入参考数据写入部分

sp = newScrapePool(scrapeConfig, m.store, log.With(m.logger, "scrape_pool", setName), m.externalLabels, &scrapePoolMetrics{

targetIntervalLength:          m.targetIntervalLength,

targetReloadIntervalLength:    m.targetReloadIntervalLength,

targetSyncIntervalLength:      m.targetSyncIntervalLength,

targetScrapePoolSyncsCounter:  m.targetScrapePoolSyncsCounter,

targetScrapeSampleLimit:       m.targetScrapeSampleLimit,

targetScrapeSampleDuplicate:   m.targetScrapeSampleDuplicate,

targetScrapeSampleOutOfOrder:  m.targetScrapeSampleOutOfOrder,

targetScrapeSampleOutOfBounds: m.targetScrapeSampleOutOfBounds,

})

m.scrapePools[setName] = sp

else {

sp = existing

}

 

wg.Add(1)

// Run the sync in parallel as these take a while and at high load can't catch up.

go func(sp *scrapePool, groups []*targetgroup.Group) {

sp.Sync(groups)

wg.Done()

sp, groups)

}

wg.Wait()

}

  • 数据写入操作
    pkg/scrape/scrape.go

 

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {

select {

case <-time.After(sl.scraper.offset(interval)):

// Continue after a scraping offset.

case <-sl.scrapeCtx.Done():

close(sl.stopped)

return

}

 

var last time.Time

 

ticker := time.NewTicker(interval)

defer ticker.Stop()

 

mainLoop:

for {

select {

case <-sl.ctx.Done():

close(sl.stopped)

return

case <-sl.scrapeCtx.Done():

break mainLoop

default:

}

 

start := time.Now()

 

// Only record after the first scrape.

if !last.IsZero() {

sl.intervalLength.WithLabelValues(interval.String()).Observe(

time.Since(last).Seconds(),

)

}

 

b := sl.buffers.Get(sl.lastScrapeSize).([]byte)

buf := bytes.NewBuffer(b)

 

var profileType string

for _, l := range sl.target.labels {

if l.Name == ProfileName {

profileType = l.Value

break

}

}

 

scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)

scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType)

cancel()

 

if scrapeErr == nil {

b = buf.Bytes()

// NOTE: There were issues with misbehaving clients in the past

// that occasionally returned empty results. We don't want those

// to falsely reset our buffer size.

if len(b) > 0 {

sl.lastScrapeSize = len(b)

}

 

tl := sl.target.Labels()

tl = append(tl, labels.Label{Name: "__name__", Value: profileType})

for _, l := range sl.externalLabels {

tl = append(tl, labels.Label{

Name:  l.Name,

Value: l.Value,

})

}

level.Debug(sl.l).Log("msg", "appending new sample", "labels", tl.String())

 

protolbls := &profilepb.LabelSet{

Labels: []*profilepb.Label{},

}

for _, l := range tl {

protolbls.Labels = append(protolbls.Labels, &profilepb.Label{

Name:  l.Name,

Value: l.Value,

})

}

// 数据写入

_, err := sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{

Tenant: "",

Series: []*profilepb.RawProfileSeries{

{

Labels: protolbls,

Samples: []*profilepb.RawSample{

{

RawProfile: buf.Bytes(),

},

},

},

},

})

if err != nil {

switch errc {

case nil:

level.Error(sl.l).Log("msg", "WriteRaw failed for scraped profile", "err", err)

default:

errc <- err

}

}

 

sl.target.health = HealthGood

sl.target.lastScrapeDuration = time.Since(start)

sl.target.lastError = nil

else {

level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())

if errc != nil {

errc <- scrapeErr

}

 

sl.target.health = HealthBad

sl.target.lastScrapeDuration = time.Since(start)

sl.target.lastError = scrapeErr

}

 

sl.buffers.Put(b)

last = start

 

sl.target.lastScrape = last

 

select {

case <-sl.ctx.Done():

close(sl.stopped)

return

case <-sl.scrapeCtx.Done():

break mainLoop

case <-ticker.C:

}

}

 

close(sl.stopped)

}

  • 数据查询部分
    因为frostdb 对于查询是分离的
    pkg/parca/parca.go

 

q := queryservice.NewColumnQueryAPI(

logger,

tracerProvider.Tracer("query-service"),

sharepb.NewShareClient(conn),

parcacol.NewQuerier(

tracerProvider.Tracer("querier"),

query.NewEngine(

memory.DefaultAllocator,

colDB.TableProvider(),

query.WithTracer(tracerProvider.Tracer("query-engine")),

),

"stacktraces",

metastore,

),

)

查询接口服务

parcaserver := server.NewServer(reg, version)

gr.Add(

func() error {

return parcaserver.ListenAndServe(

ctx,

logger,

flags.Port,

flags.CORSAllowedOrigins,

flags.PathPrefix,

server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {

debuginfopb.RegisterDebugInfoServiceServer(srv, dbgInfo)

profilestorepb.RegisterProfileStoreServiceServer(srv, s)

querypb.RegisterQueryServiceServer(srv, q)

scrapepb.RegisterScrapeServiceServer(srv, m)

 

if err := debuginfopb.RegisterDebugInfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {

return err

}

 

if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {

return err

}

 

if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {

return err

}

 

if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {

return err

}

 

return nil

}),

)

},

func(_ error) {

ctx, cancel := context.WithTimeout(ctx, 30*time.Second) // TODO make this a graceful shutdown config setting

defer cancel()

 

level.Debug(logger).Log("msg", "server shutting down")

err := parcaserver.Shutdown(ctx)

if err != nil && !errors.Is(err, context.Canceled) {

level.Error(logger).Log("msg", "error shutting down server", "err", err)

}

 

// Close the columnstore after the parcaserver has shutdown to ensure no more writes occur against it.

if err := col.Close(); err != nil {

level.Error(logger).Log("msg", "error closing columnstore", "err", err)

}

},

)

说明

因为frostdb包装的比较方便,parca 对于frostdb 的使用没有太多复杂的东西,基本是直接使用,frostdb 内部机制还是值得学习的

参考资料

​​https://github.com/polarsignals/frostdb​​​
​​​https://www.parca.dev/docs/storage​​​
​​​https://github.com/polarsignals/frostdb/blob/main/examples/simple.go​​

举报

相关推荐

0 条评论