0
点赞
收藏
分享

微信扫一扫

Heka 的配置文件加载逻辑

Heka 使用的是 TOML 格式的配置文件, 有关 golang 加载 TOML 配置文件的技术请参看

Heka用的 读取 TOML 文件的Go库是: https://github.com/bbangert/toml 虽然跟上面文章的不是一个,但是基本差不多。

 

我们在hekad主进程的配置文件加载的逻辑如下:

  1. 在启动时,先读取hekad部分的配置内容,用于hekad的启动。
  2. 然后再去读其他配置节,把heka的管道启动起来。

github.com\mozilla-services\heka\cmd\hekad\main.go 文件中,我们摘要核心代码如下,就可以看到这个逻辑:

这里删除了一些跟这个逻辑过程无关的代码,方便阅读。

Heka 的配置文件加载逻辑_加载

加载 hekad 部分配置

代码实现如下, 注意,这里处理了配置文件内置支持环境变量的功能。

 

Heka 的配置文件加载逻辑_配置文件_02

对环境变量支持的文档如下:

Using Environment Variables

If you wish to use environmental variables in your config files as a way to configure values, you can simply use %ENV[VARIABLE_NAME] and the text will be replaced with the value of the environmental variable VARIABLE_NAME.

Example:

[AMQPInput]
url = "amqp://%ENV[USER]:%ENV[PASSWORD]@rabbitmq/"
exchange = "testout"
exchangeType = "fanout"

 

加载插件相关的配置逻辑

加载插件配置的逻辑主要是下面代码:

Heka 的配置文件加载逻辑_TOML_03

每个Heka的TOML配置节,对应的实体类如下:

// The TOML spec for plugin configuration options that will be pulled out by
// Heka itself before the config is passed to the Plugin.Init method. Not all
// options apply to all plugin types.
type PluginGlobals struct {
	Typ        string `toml:"type"`
	Ticker     uint   `toml:"ticker_interval"`
	Matcher    string `toml:"message_matcher"` // Filter and Output only.
	Signer     string `toml:"message_signer"`  // Filter and Output only.
	Retries    RetryOptions
	Encoder    string // Output only.
	UseFraming *bool  `toml:"use_framing"` // Output only.
	CanExit    *bool  `toml:"can_exit"`
}

对应的解析时的逻辑如下:

 

func (self *PipelineConfig) loadPluginGlobals(section *ConfigSection) (err error) {
  
    // Set up default retry policy.     pGlobals := new(PluginGlobals)
  
    pGlobals.Retries = RetryOptions{

  
        MaxDelay:   "30s",

  
        Delay:      "250ms",

  
        MaxRetries: -1,

  
    }     if err = toml.PrimitiveDecode(section.tomlSection, pGlobals); err != nil {
  
        err = fmt.Errorf("Unable to decode config for plugin '%s': %s",

  
            section.name, err)

  
        return

  
    }     if pGlobals.Typ == "" {
  
        pGlobals.Typ = section.name

  
    }     if _, ok := AvailablePlugins[pGlobals.Typ]; !ok {
  
        err = fmt.Errorf("No registered plugin type: %s", pGlobals.Typ)

  
    } else {

  
        section.globals = pGlobals

  
    }

  
    return

  
}

补充说明如下:

  • 如果 type为空, 则 这个节的名字就是 type
  • 插件要起作用,需要 RegisterPlugin , 参看 http://hekad.readthedocs.org/en/v0.8.0/developing/plugin.html 

插件的类型有5种,都是在名字或者type上可以看出来的, 对应的判断类型的代码如下:

var PluginTypeRegex = regexp.MustCompile("(Decoder|Encoder|Filter|Input|Output)$") 
func getPluginCategory(pluginType string) string {
  
    pluginCats := PluginTypeRegex.FindStringSubmatch(pluginType)

  
    if len(pluginCats) < 2 {

  
        return ""

  
    }

  
    return pluginCats[1]

  
}

加载配置文件这里,代码在 LoadFromConfigFile 函数, 这里其实就主要做了上面两个事情,外加不同类型插件的特殊处理。

// Loads all plugin configuration from a TOML configuration file. The
  
// PipelineConfig should be already initialized via the Init function before

  
// this method is called.

  
func (self *PipelineConfig) LoadFromConfigFile(filename string) (err error) {

  
    var configFile ConfigFile

  
    // 更新配置文件中,自定义变量(环境变量)

  
    contents, err := ReplaceEnvsFile(filename)

  
    if err != nil {

  
        return err

  
    }     // TOML 解析成 configFile
  
    if _, err = toml.Decode(contents, &configFile); err != nil {

  
        return fmt.Errorf("Error decoding config file: %s", err)

  
    }     var (
  
        errcnt              uint

  
        protobufDRegistered bool

  
        protobufERegistered bool

  
    )

  
    sectionsByCategory := make(map[string][]*ConfigSection)     // Load all the plugin globals and file them by category.
  
    for name, conf := range configFile {

  
        if name == HEKA_DAEMON {

  
            continue

  
        }

  
        log.Printf("Pre-loading: [%s]\n", name)

  
        section := &ConfigSection{

  
            name:        name,

  
            tomlSection: conf,

  
        }         // 加载插件配置文件, 这里面做了插件注册的检查
  
        if err = self.loadPluginGlobals(section); err != nil {

  
            self.log(err.Error())

  
            errcnt++

  
            continue

  
        }         // 获取插件的类型
  
        category := getPluginCategory(section.globals.Typ)

  
        if category == "" {

  
            self.log(fmt.Sprintf("Type doesn't contain valid plugin name: %s\n",

  
                section.globals.Typ))

  
            errcnt++

  
            continue

  
        }         // 特殊插件类型的处理
  
        section.category = category

  
        if section.globals.Typ == "MultiDecoder" {

  
            // Special case MultiDecoders so we can make sure they get

  
            // registered *after* all possible subdecoders.

  
            sectionsByCategory["MultiDecoder"] = append(sectionsByCategory["MultiDecoder"],

  
                section)

  
        } else {

  
            sectionsByCategory[category] = append(sectionsByCategory[category], section)

  
        }

  
        if name == "ProtobufDecoder" {

  
            protobufDRegistered = true

  
        }

  
        if name == "ProtobufEncoder" {

  
            protobufERegistered = true

  
        }

  
    }     // Make sure ProtobufDecoder is registered.
  
    if !protobufDRegistered {

  
        var configDefault ConfigFile

  
        toml.Decode(protobufDecoderToml, &configDefault)

  
        log.Println("Pre-loading: [ProtobufDecoder]")

  
        section := &ConfigSection{

  
            name:        "ProtobufDecoder",

  
            category:    "Decoder",

  
            tomlSection: configDefault["ProtobufDecoder"],

  
        }

  
        if err = self.loadPluginGlobals(section); err != nil {

  
            // This really shouldn't happen.

  
            self.log(err.Error())

  
            errcnt++

  
        } else {

  
            sectionsByCategory["Decoder"] = append(sectionsByCategory["Decoder"],

  
                section)

  
        }

  
    }     // Make sure ProtobufEncoder is registered.
  
    if !protobufERegistered {

  
        var configDefault ConfigFile

  
        toml.Decode(protobufEncoderToml, &configDefault)

  
        log.Println("Pre-loading: [ProtobufEncoder]")

  
        section := &ConfigSection{

  
            name:        "ProtobufEncoder",

  
            category:    "Encoder",

  
            tomlSection: configDefault["ProtobufEncoder"],

  
        }

  
        if err = self.loadPluginGlobals(section); err != nil {

  
            // This really shouldn't happen.

  
            self.log(err.Error())

  
            errcnt++

  
        } else {

  
            sectionsByCategory["Encoder"] = append(sectionsByCategory["Encoder"],

  
                section)

  
        }

  
    }     multiDecoders := make([]multiDecoderNode, len(sectionsByCategory["MultiDecoder"]))
  
    multiConfigs := make(map[string]*ConfigSection)     for i, section := range sectionsByCategory["MultiDecoder"] { 
        multiConfigs[section.name] = section 
        multiDecoders[i] = newMultiDecoderNode(section.name, subsFromSection(section.tomlSection)) 
    }
  
    multiDecoders, err = orderDependencies(multiDecoders)

  
    if err != nil {

  
        return err

  
    }

  
    for i, d := range multiDecoders {

  
        sectionsByCategory["MultiDecoder"][i] = multiConfigs[d.name]

  
    }     // Append MultiDecoders to the end of the Decoders list.
  
    sectionsByCategory["Decoder"] = append(sectionsByCategory["Decoder"],

  
        sectionsByCategory["MultiDecoder"]...)     // Force decoders and encoders to be registered before the other plugin
  
    // types are initialized so we know they'll be there for inputs and

  
    // outputs to use during initialization.

  
    order := []string{"Decoder", "Encoder", "Input", "Filter", "Output"}

  
    for _, category := range order {

  
        for _, section := range sectionsByCategory[category] {

  
            log.Printf("Loading: [%s]\n", section.name)

  
            if err = self.loadSection(section); err != nil {

  
                self.log(err.Error())

  
                errcnt++

  
            }

  
        }

  
    }     if errcnt != 0 {
  
        return fmt.Errorf("%d errors loading plugins", errcnt)

  
    }     return
  
}

举报

相关推荐

0 条评论