0
点赞
收藏
分享

微信扫一扫

Logstash grok正则调试

1.为Logstash添加一个配置文件dnsquery.conf,如下

input {
kafka {
add_field => { "es_index_name" => "dns-query" }
bootstrap_servers => "10.0.8.1:9092"
topics => "dns-query"
consumer_threads => 9
codec => json {
charset => "UTF-8"
}
}
}

filter {
if [es_index_name] == "ex-iis" {
grok {
match => { "message" => "%{DATA:date_time}\s*%{IPV4:s_ip}\s*%{WORD:cs_method}\s*%{URIPATH:cs_uri_stem}\s*(?<cs_uri_query>([=-_;&\S+]*))\s*%{BASE10NUM:s_port}\s*(?<cs_username>([.\S+]*))\s*%{IPV4:c_ip}\s*(?<cs_user_agent>([=-_;&/\S+]*))\s*(?<cs_referer>([=-_;&/\S+]*))\s*%{BASE10NUM:cs_status}\s*%{BASE10NUM:cs_substatus}\s*%{BASE10NUM:cs_win32_status}\s*%{BASE10NUM:time_taken}" }
}
mutate {
remove_field => ["agent.ephemeral_id", "agent.id", "_score", "_id", "agent.type", "agent.version", "log.offset"]
remove_field => ["message"]
}
}

else if [es_index_name] == "dns-query" {
grok {
match => { "message" => "(?<date_time>\d{4}/\d{1,2}/\d{1,2}\s*[0-9:]{7,8})\s*(?<ThreatID>[0-9A-Za-z]{4})\s*%{NOTSPACE:Context}\s*(?<PacketId>\S+)\s*%{NOTSPACE:UDPTCP}\s*%{NOTSPACE:SendReceive}\s*%{IP:ClientIP}\s*(?<Xid>\S+)\s*(?<QR>\S+)\s*(?<Opcode>\S+)\s*(?<Flags>(\[.*?\]))\s*(?<ResponseCode>\S+)\s*(?<QueryContent>\S+)" }
}
mutate {
gsub => ["QueryContent","\(.*?\)",".","QueryContent","^.","","QueryContent",".$",""] #多次替换,先替换所有小括号为点,然后替换开头的点为空,再替换结尾的点为空
remove_field => ["agent.ephemeral_id", "agent.id", "_score", "_id", "agent.type", "agent.version", "log.offset"]
remove_field => ["message"]
}
}


else {
mutate {
remove_field => ["beat", "@version"]
}
}
}

output {
# 调试控制台输出
stdout { codec => rubydebug { metadata => true } }
}

2. 启动一个logstash进程,指定该配置文件 /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/dnsquery.conf ,在前台运行

3.客户端正常收集到日志后,logstash前台会有日志显示

grok正则调试:

Logstash grok正则调试_正则

 

举报

相关推荐

0 条评论