0
点赞
收藏
分享

微信扫一扫

logstash多配置文件的使用和常见问题


背景

假设现在给Logstash的pipeline配置了2个conf,也就是2个输入源。如果不做任何处理,那么所有的Filter和Output都会同时触发,如下图:

logstash多配置文件的使用和常见问题_elasticsearch

这显然跟我们期望的不同,我们希望Logstash按以下的方式来处理,也就是各自区分,独立处理:

logstash多配置文件的使用和常见问题_elasticsearch_02

 

解决方案

官方提供有 type 和tags 配置项进行区分,type 和 tags 是 logstash 事件中两个特殊的字段。

通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而tags 则是在数据处理过程中,由具体的插件来添加或者删除的。

如下所示:


input {         jdbc {                 jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"                         jdbc_user => "es_igper"                         jdbc_password => "password"                         jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"                         jdbc_driver_class => "com.mysql.jdbc.Driver"                         jdbc_paging_enabled => "true"                         jdbc_page_size => "10000"                         plugin_timezone => "local"                         clean_run => false                         use_column_value => true                         tracking_column => update_time                         tracking_column_type => "timestamp"                         record_last_run => true                         last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"                         statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"                         schedule => "* * * * *"                         type => "live"         } } filter {         if [type]=="live" {                 mutate {                         remove_field => ["username","passwd","wx_openid"]                 }         } } output {         if [type]=="live" {                 elasticsearch {                         hosts => ["localhost:9200"]                                 index => "live_v4"                                 document_id => "%{id}"                                 user => "elastic"                                 password => "password"                 }                 stdout {                         codec => json_lines                 }         } }


 

以上是通过type区分,传入的数据中带有type字段/tags字段,会覆盖logstash配置文件设置的type属性值,这是个大坑,注意


input {         jdbc {                 jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"                         jdbc_user => "es_igper"                         jdbc_password => "password"                         jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"                         jdbc_driver_class => "com.mysql.jdbc.Driver"                         jdbc_paging_enabled => "true"                         jdbc_page_size => "10000"                         plugin_timezone => "local"                         clean_run => false                         use_column_value => true                         tracking_column => update_time                         tracking_column_type => "timestamp"                         record_last_run => true                         last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"                         statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"                         schedule => "* * * * *"                         tags=> ["live"]         } } filter {         if "live" in [tags]{                 mutate {                         remove_field => ["username","passwd","wx_openid"]                 }         } } output {        if "live" in [tags]{                 elasticsearch {                         hosts => ["localhost:9200"]                                 index => "live_v4"                                 document_id => "%{id}"                                 user => "elastic"                                 password => "password"                 }                 stdout {                         codec => json_lines                 }         }          if "user" in [tags]{                 elasticsearch {                         hosts => ["localhost:9200"]                                 index => "user_v4"                                 document_id => "%{id}"                                 user => "elastic"                                 password => "password"                 }                 stdout {                         codec => json_lines                 }         } }


 

通过以上方式配置,就能将pipeline各自区分开。大家可以试下,如果把type参数去掉,最后的elasticsearch数据会混或者数据重复等奇怪问题

 

原因

logstash启动后会把多个配置文件自动合并成一个,比如有两个conf文件,每个都有 filter、ouput,写入一条数据,会执行两次filter和output,重复写入

 

 

举报

相关推荐

0 条评论