2016-11-25 77 views
1

我有以下情形:问题与logstash与卡夫卡场条件句----> FileBeat勘探者

FileBeat ---->卡夫卡-----> Logstash ----->弹性----> Kibana

在Filebeat我有2个探矿中YML文件,,。并添加一些字段来识别日志数据。 但是,问题是:在Logstash 我还没有能够验证这个领域。

的配置文件是:

1. filebeat.yml

filebeat.prospectors: 
- input_type: log 
    paths: 
    - /opt/jboss/server.log* 
    tags: ["log_server"] 
    fields: 
    environment: integracion 
    log_type: log_server 

    document_type: log_server 
    fields_under_root: true 


- input_type: log 
    paths: 
    - /var/todo1_apps/ebanTX.log* 
    tags: ["log_eban"] 
    fields: 
    environment: integracion 
    log_type: log_ebanking 

    document_type: log_ebanking 
    fields_under_root: true 

output.kafka: 
    enabled: true 
    hosts: ["192.168.105.68:9092"] 
    topic: "sve_logs" 
    timeout: 30s 

2. logstash.conf

input { 
    kafka { 
    bootstrap_servers => "192.xxx.xxx.xxx:9092" 
    group_id => "sve_banistmo" 
    topics => ["sve_logs"] 
    decorate_events => true 
    codec => "plain" 
    } 
    } 

filter { 
if [type] == "log_ebanking" { 
    grok { 
     patterns_dir => ["patterns/patterns"] 
     match => { "message" => "%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{LOGLEVEL:level}%{SPACE}\[%{DATA:thread}]%{SPACE}-%{SPACE}%{GREEDYDATA:message_log}" } 
     } 
    } 
} 

output { 
if [type] == "log_ebanking" { 
     elasticsearch { 
     hosts => ["192.168.105.67:9200"] 
     index => "sve-banistmo-ebanking-%{+YYYY.MM.dd}" 
     } 
     stdout { codec => json} 
    } 
} 

的问题是在条件滤波器输出部分。我试过

@[metadata][type] 
@metadata][type] 
@metadata.type 
metadata.type 
[type] 

同时具有type和log_type变量。没有什么作品:S 如果我不放条件,数据流没有问题。我的意思是,不是一个连接问题。

请帮帮我。我已经审查了所有相关的信息,但在我的情况下,条件不起作用。

预先感谢

的Darioř

+0

已被弃用。当你不加条件是有场'type'与事件的值'log_ebanking'? –

+0

您好,是的,这是Logstash中的消息示例: {“@timestamp”:“2016-11-25T18:57:48.569Z”,“beat”:{“hostname”:“inrhas14”,“命名 “:” inrhas14" , “版本”: “5.0.0”}, “环境”: “integracion”, “INPUT_TYPE”: “日志”,** “log_type”: “log_ebanking” **, “消息”: “2016-11-25 13:36:53,508错误[http-192.168.105.79:10080-3] - USER_ID:NONE”,“offset”:216302,“source”:“/ var/todo1_apps/SVE-BSMO/logs /ebankingTX.log","tags":["log_ebanking"],**"type":"log_ebanking"**} –

+0

您可以使用'--debug'标志运行logstash管道并提供输出吗? – Val

回答

0

的问题是从卡夫卡消息未被解码。 Logstash会将filebeat报告的整个json消息作为消息。 您可以添加json过滤器来解码json格式的消息。

filter { 
    json { 
    source => "message" 
    } 
} 

该字段将被解码。消息字段将被真正的消息替代,而不是整个json字符串。

然后你可以使用[类型]在你的条件块。 当使用kafka作为输出时,filemet会不报告@metadata。所以你看不到@metadata。

1

使用codec => "json"logstash.conf kafka输入conf中提取消息中的所有字段。

0

卡夫卡之前通过这一去过! 下面是我做的步骤,使其工作:

  1. 更新您的卡夫卡输入插件 CD的/ usr /共享/ logstash/bin中 然后./logstash-plugin更新logstash输入,卡夫卡
  2. 在你的LS配置文件中添加这个到你的Kakfa输入插件codec => "json"
  3. 让你暂时过滤空白,因为首先你需要确保你在Elasticsearch上接收JSON数据。如果你有Kibana检查那里或运行ES查询,如果你不。
  4. 那么你应该可以访问LS配置文件中任何地方的任何字段。

现在为您的输出,我看到你正在添加“log_type”到你的事件filebeat,然后我会建议你的LS输出插件你做if "log_ebanking" == [log_type]

字段[type]默认为filebeat的“logs”,所有事件的metricbeat默认为“metricsets”。

不知道什么是你的Filebeat版本,但在这个看起来document_type在5.5 https://www.elastic.co/guide/en/beats/filebeat/current/configuration-filebeat-options.html#filebeat-document-type