2017-02-16 493 views
1

我有一种ELK堆栈,使用fluentd而不是logstash,作为Kubernetes集群上的DaemonSet运行,并将所有容器中的所有日志以logstash格式发送到Elasticsearch服务器。Kibana - 如何从现有的Kubernetes日志中提取字段

走出Kubernetes集群上运行的许多容器的一些是nginx的容器中以下列格式的输出日志:在Kibana

121.29.251.188 - [16/Feb/2017:09:31:35 +0000] host="subdomain.site.com" req="GET /data/schedule/update?date=2017-03-01&type=monthly&blocked=0 HTTP/1.1" status=200 body_bytes=4433 referer="https://subdomain.site.com/schedule/2589959/edit?location=23092&return=monthly" user_agent="Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" time=0.130 hostname=webapp-3188232752-ly36o 

字段可见是按照此屏幕截图:

kibana nginx log

是否有可能从这种类型的日志中提取字段后,它被编入索引?

的fluentd集电极被配置为与下面的源,它处理所有的容器,所以强制执行该阶段的格式是不可能的,因为从不同的容器中的非常不同的输出:

<source> 
    type tail 
    path /var/log/containers/*.log 
    pos_file /var/log/es-containers.log.pos 
    time_format %Y-%m-%dT%H:%M:%S.%NZ 
    tag kubernetes.* 
    format json 
    read_from_head true 
</source> 

在理想情况下,我想用“主”,“请求”,“状态”等元数据字段在上面的屏幕截图中丰富可见的字段。

回答

0

在经过几天的研究并习惯于EFK stack之后,我到达了EFK特定的解决方案,与Darth_Vader的答案相反,这只能在ELK堆栈中实现。

所以总结一下,我使用Fluentd代替Logstash,因此,如果您还安装Fluentd Grok Plugin,我决定不这样做,因为任何神交的解决方案将工作:

事实证明,Fluentd有其通过使用parser filters自己的字段提取功能。为了解决这个问题,我的问题,<match **>前行吧,等以后日志行对象是已经与kubernetes元数据字段和标签充实,我增加了以下内容:

<filter kubernetes.var.log.containers.webapp-**.log> 
    type parser 
    key_name log 
    reserve_data yes 
    format /^(?<ip>[^-]*) - \[(?<datetime>[^\]]*)\] host="(?<hostname>[^"]*)" req="(?<method>[^ ]*) (?<uri>[^ ]*) (?<http_version>[^"]*)" status=(?<status_code>[^ ]*) body_bytes=(?<body_bytes>[^ ]*) referer="(?<referer>[^"]*)" user_agent="(?<user_agent>[^"]*)" time=(?<req_time>[^ ]*)/ 
</filter> 

为了解释:

<filter kubernetes.var.log.containers.webapp-**.log> - 在与此标签匹配的所有行上应用该块;在我的情况下,网络服务器组件的容器称为webapp- {}东西

type parser - 告诉fluentd申请解析器过滤器

key_name log - 仅在日志行的log财产应用模式,不整条生产线,这是一个JSON字符串

reserve_data yes - 非常重要的,如果没有指定整个日志行对象只从format提取属性所取代,因此,如果您已经有其他属性,如那些由加kubernetes_metadata过滤器,当不添加时会删除这些过滤器个选项

format - 这是在log键的值加一个正则表达式来提取命名属性

请注意,我使用Fluentd 1.12,所以这个语法是不符合新的1.14完全兼容语法,但该原则将适用于解析器声明的微小调整。

0

为了提取日志行到字段中,则可能必须使用grok过滤器。你可以做的是有一个正则表达式模式,以匹配你需要的日志行的确切部分。 神交过滤器可以是这个样子:

grok { 
    patterns_dir => ["pathto/patterns"] 
    match => { "message" => "^%{LOGTIMESTAMP:logtimestamp}%{GREEDYDATA:data}" }   
}             ^-----------------------^ are the fields you would see in ES when log is being indexed 

--------------------------------- -------------------^LOGTIMESTAMP应该在你的模式文件中定义是这样的:

LOGTIMESTAMP %{YEAR}%{MONTHNUM}%{MONTHDAY} %{TIME} 

一旦你拥有了匹配字段,那么你可以简单地使用他们为filtering的目的,或者你仍然可以保持原样,如果主要导致它从日志行提取字段。

if "something" in [message]{ 
    mutate { 
     add_field => { "new_field" => %{logtimestamp} } 
    }   
} 

以上只是一个示例,以便您可以重现它以满足您的需求。你可以使用this工具,以测试你的模式以及你想要匹配的字符串!

Blog post,可能会得心应手!希望这可以帮助。

+0

感谢您的回答,@ Darth_vader。我知道这可以用logstash来完成,但我使用fluentd进行日志收集,请参阅我的问题中的配置。你知道这可以通过开箱即可完成吗? – bedeabza

相关问题