2014-10-30 79 views
1

我有以下json输入,我想转储到logstash(最终在elasticsearch/kibana中搜索/仪表板)。输入json到logstash - config的问题?

{"vulnerabilities":[ 
    {"ip":"10.1.1.1","dns":"z.acme.com","vid":"12345"}, 
    {"ip":"10.1.1.2","dns":"y.acme.com","vid":"12345"}, 
    {"ip":"10.1.1.3","dns":"x.acme.com","vid":"12345"} 
]} 

我使用

input { 
    file { 
    path => "/tmp/logdump/*" 
    type => "assets" 
    codec => "json" 
    } 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { host => localhost } 
} 

输出

{ 
     "message" => "{\"vulnerabilities\":[\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.788Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
     "message" => "{\"ip\":\"10.1.1.30\",\"dns\":\"z.acme.com\",\"vid\":\"12345\"},\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.838Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
     "message" => "{\"ip\":\"10.1.1.31\",\"dns\":\"y.acme.com\",\"vid\":\"12345\"},\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.870Z", 
      "type" => "shellshock", 
      "host" => "av1261wag2sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
      "ip" => "10.1.1.32", 
      "dns" => "x.acme.com", 
      "vid" => "12345", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.884Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 

明显logstash正在处理的每一行作为一个事件和其认为{"vulnerabilities":[以下logstash配置是一个事件,我猜测2个后续节点上的尾随逗号会搞乱解析,并且最后一个节点看起来是正确的。我如何告诉logstash解析漏洞数组内的事件并忽略行尾的逗号?

更新:2014-11-05 根据Magnus的建议,我添加了json过滤器,它的工作完美。但是,如果没有在文件输入块中指定start_position => "beginning",它不会正确解析json的最后一行。任何想法为什么不呢?我知道它会默认解析自下而上,但是会预期mutate/gsub能够顺利处理这个问题吗?

file { 
    path => "/tmp/logdump/*" 
    type => "assets" 
    start_position => "beginning" 
    } 
} 
filter { 
    if [message] =~ /^\[?{"ip":/ { 
    mutate { 
     gsub => [ 
     "message", "^\[{", "{", 
     "message", "},?\]?$", "}" 
     ] 
    } 
    json { 
     source => "message" 
     remove_field => ["message"] 
    } 
    } 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { host => localhost } 
} 

回答

5

你可以跳过JSON编解码器和使用多过滤器加入该消息成一个单一的字符串,可以喂到JSON filter.filter {

filter { 
    multiline { 
    pattern => '^{"vulnerabilities":\[' 
    negate => true 
    what => "previous" 
    } 
    json { 
    source => "message" 
    } 
} 

然而,这会产生以下不想要的结果:

{ 
      "message" => "<omitted for brevity>", 
      "@version" => "1", 
     "@timestamp" => "2014-10-31T06:48:15.589Z", 
       "host" => "name-of-your-host", 
       "tags" => [ 
     [0] "multiline" 
    ], 
    "vulnerabilities" => [ 
     [0] { 
      "ip" => "10.1.1.1", 
      "dns" => "z.acme.com", 
      "vid" => "12345" 
     }, 
     [1] { 
      "ip" => "10.1.1.2", 
      "dns" => "y.acme.com", 
      "vid" => "12345" 
     }, 
     [2] { 
      "ip" => "10.1.1.3", 
      "dns" => "x.acme.com", 
      "vid" => "12345" 
     } 
    ] 
} 

除非有漏洞的阵列,我不认为还有很多我们可以做这个元素中的一个固定数(不诉诸红宝石滤波)。

如何将json过滤器应用于看起来像我们想要的行并放弃其余行?你的问题没有说清楚是否所有的日志都是这样的,所以这可能不是那么有用。

filter { 
    if [message] =~ /^\s+{"ip":/ { 
    # Remove trailing commas 
    mutate { 
     gsub => ["message", ",$", ""] 
    } 
    json { 
     source => "message" 
     remove_field => ["message"] 
    } 
    } else { 
    drop {} 
    } 
}