2015-09-13 31 views
3

我有一个csv文件,用下面的标题:输入数据文件到logstash

"PacketId","MACAddress","Date","PacketLength","SourceIP","SourcePort","DestIP","DestPort" 

我想指数使用LogStash的数据ElasticSearch,而不能写同一个过滤器。

filter { 
    grok { 
     match => message => "%{IP:SourceIP}" 
    } 
    } 

上述过滤器给SOURCEIP领域的一个很好的提取,但我怎么写神交模式来提取它的所有领域。

回答

4

让下面的CSV文件:

1,00-14-22-01-23-45,13/09/2015,32,128.248.1.43,9980,128.248.23.13,9880 
1,01-74-02-84-13-98,14/09/2015,64,128.248.1.94,9280,128.248.13.84,9380 

这里Logstash配置必须设置:

input { 
    file { 
     path => "/path/of/your/csv/test.csv" 
     sincedb_path => "/path/of/your/csv/test.idx" 
     start_position => "beginning" 
    } 
} 

filter { 
    csv { 
     separator => "," 
     columns => ["PacketId","MACAddress","Date","PacketLength","SourceIP","SourcePort","DestIP","DestPort"] 
    } 
} 

output { 
    stdout { 
     codec => rubydebug  
    } 
} 

您将在输出结果得到:

{ 
     "message" => [ 
     [0] "1,00-14-22-01-23-45,13/09/2015,32,128.248.1.43,9980,128.248.23.13,9880" 
    ], 
     "@version" => "1", 
     "@timestamp" => "2015-09-14T20:11:28.976Z", 
      "host" => "MyHost.local", 
      "path" => "/path/of/your/csv/test.csv", 
     "PacketId" => "1", 
     "MACAddress" => "00-14-22-01-23-45", 
      "Date" => "13/09/2015", 
    "PacketLength" => "32", 
     "SourceIP" => "128.248.1.43", 
     "SourcePort" => "9980", 
      "DestIP" => "128.248.23.13", 
     "DestPort" => "9880" 
} 
{ 
     "message" => [ 
     [0] "1,01-74-02-84-13-98,14/09/2015,64,128.248.1.94,9280,128.248.13.84,9380" 
    ], 
     "@version" => "1", 
     "@timestamp" => "2015-09-14T20:11:28.978Z", 
      "host" => "MyHost.local", 
      "path" => "/path/of/your/csv/test.csv", 
     "PacketId" => "1", 
     "MACAddress" => "01-74-02-84-13-98", 
      "Date" => "14/09/2015", 
    "PacketLength" => "64", 
     "SourceIP" => "128.248.1.94", 
     "SourcePort" => "9280", 
      "DestIP" => "128.248.13.84", 
     "DestPort" => "9380" 
} 

问候, 阿兰