2016-12-03 89 views
2

我我的应用程序服务器上安装Filebeat 5.0,并有3个Filebeat勘探者,每个探矿者都指向不同的日志路径,并输出到一个叫myapp_applog卡夫卡的话题,一切工作正常。Filebeat 5.0输出卡夫卡多个主题

我Filebeat输出配置到一个话题 - 工作

output.kafka: 
    # initial brokers for reading cluster metadata 
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"] 

    # message topic selection + partitioning 
    topic: 'myapp_applog' 
    partition.round_robin: 
     reachable_only: false 

    required_acks: 1 
    compression: gzip 
    max_message_bytes: 1000000 

我想要做的就是发送的每个日志文件,以单独的基于条件的主题看文档部分上topics。我试图做到这一点,但没有数据发送到任何主题。有谁知道为什么我的情况不匹配或者是正确的。我似乎可以找到一个关于如何正确使用“话题话题条件”的例子。

这里是我的卡夫卡输出到muliple主题配置。

不工作

output.kafka: 
    # initial brokers for reading cluster metadata 
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"] 

    # message topic selection + partitioning 
    topics: 
     - topic: 'myapp_applog' 
     when: 
      equals: 
      document_type: applog_myappapi 
     - topic: 'myapp_applog_stats' 
     when: 
      equals: 
      document_type: applog_myappapi_stats 
     - topic: 'myapp_elblog' 
     when: 
      equals: 
      document_type: elblog_myappapi 
    partition.round_robin: 
     reachable_only: false 

    required_acks: 1 
    compression: gzip 
    max_message_bytes: 1000000 

这里充满filebeat.yml配置文件。

################### Filebeat Configuration Example ######################### 
############################# Filebeat ###################################### 
filebeat.prospectors: 
    # App logs - prospector 
    - input_type: log 
     paths: 
     - /myapp/logs/myapp.log 
     exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"] 
     exclude_files: [".gz$", ".tmp"] 
     fields: 
     api: myappapi 
     environment: STG 
     ignore_older: 24h 
     document_type: applog_myappapi 
     scan_frequency: 1s 

     # Multine on Timestamp, YYYY-MM-DD 
     # https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html 
     multiline: 
     pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' 
     negate: true 
     match: after 
     max_lines: 500 
     timeout: 5s 

    # Server Stats - prospector 
    - input_type: log 
     paths: 
     - /myapp/logs/serverstats.log 

     # Exclude messages with log level 
     exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"] 
     exclude_files: [".gz$", ".tmp"] 
     fields: 
     api: myappapi 
     environment: STG 
     ignore_older: 24h 
     document_type: applog_myappapi_stats 
     scan_frequency: 1s 

    # ELB prospector 
    - 
     input_type: log 
     paths: 
     - /var/log/httpd/elasticbeanstalk-access_log 
     document_type: elblog_myappapi 
     fields: 
     api: myappapi 
     environment: STG 
     exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"] 
     exclude_files: [".gz$", ".tmp"] 
     ignore_older: 24h 

     # 0s, it is done as often as possible. Default: 10s 
     scan_frequency: 1s 
registry_file: /var/lib/filebeat/registry 

############################# Output ########################################## 
# Configure what outputs to use when sending the data collected by the beat. 
# Multiple outputs may be used. 
#----------------------------- Kafka output -------------------------------- 

output.kafka: 
    # initial brokers for reading cluster metadata 
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"] 

    # message topic selection + partitioning 
    topics: 
     - topic: 'myapp_applog' 
     when: 
      equals: 
      document_type: applog_myappapi 
     - topic: 'myapp_applog_stats' 
     when: 
      equals: 
      document_type: applog_myappapi_stats 
     - topic: 'myapp_elblog' 
     when: 
      equals: 
      document_type: elblog_myappapi 
    partition.round_robin: 
     reachable_only: false 

    required_acks: 1 
    compression: gzip 
    max_message_bytes: 1000000 

############################# Logging ######################################### 

# There are three options for the log ouput: syslog, file, stderr. 
# Under Windos systems, the log files are per default sent to the file output, 
# under all other system per default to syslog. 
logging: 

    # Send all logging output to syslog. On Windows default is false, otherwise 
    # default is true. 
    to_syslog: true 

    # Write all logging output to files. Beats automatically rotate files if rotateeverybytes 
    # limit is reached. 
    to_files: true 

    # To enable logging to files, to_files option has to be set to true 
    files: 
    # The directory where the log files will written to. 
    path: /var/log/ 

    # The name of the files where the logs are written to. 
    name: filebeats.log 

    # Configure log file size limit. If limit is reached, log file will be 
    # automatically rotated 
    rotateeverybytes: 10485760 # = 10MB 

    # Number of rotated log files to keep. Oldest files will be deleted first. 
    keepfiles: 7 

    # Enable debug output for selected components. To enable all selectors use ["*"] 
    # Other available selectors are beat, publish, service 
    # Multiple selectors can be chained. 
    #selectors: ["*" ] 

    # Sets log level. The default log level is error. 
    # Available log levels are: critical, error, warning, info, debug 
    level: info 

回答

3

我有同样的问题,解决它通过定义输出:

topics: 
    - topic: '%{[type]}' 
use_type: true 

,并输入你只需要在DOCUMENT_TYPE设置:kaffka的话题

  • input_type:log paths:

    • /路径/到/日志/文件 DOCUMENT_TYPE: “找你kaffka的话题1”
  • INPUT_TYPE:登录 路径:

    • /路径/到/其它/日志/文件

    DOCUMENT_TYPE: “找你的另一kaffka的话题1”

1

输入:

- type: log 
    fields: 
    kafka_topic: "my_topic_1" 

- type: log 
    fields: 
    kafka_topic: "my_topic_2" 

输出:

output.kafka: 
    hosts: ["mybroker:9092"] 
    topic: '%{[fields.kafka_topic]}' 

上面的例子示出了2级对数的输入和2个卡夫卡主题输出