2016-11-15 72 views
0

我使用卡夫卡作为我的输入,并把它在elasticsearch(输出)(logstash)指数只有特定的elasticsearch从卡夫卡输入

input { 
    kafka { 
     topics =>["maxwell"] 
     codec => json 
    } 
} 
filter { 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { 
     index => 'test_kafka' 
     document_type => "%{table}" 
     hosts => 'localhost:9200' 
    } 
} 

当这个运行数据,它输出以下JSON

{ 
    "database": "my_db", 
    "xid": 88935, 
    "@timestamp": "2016-11-14T12:00:13.763Z", 
    "data": { 
    "contact_country_code": null, 
    "contact_type_id": 1, 
    "created": "2014-10-03 12:24:36", 
    "modified_by": null, 
    "modified": "2014-10-03 12:24:36", 
    "contact_id": 1, 
    "is_default": 0, 
    "created_by": null, 
    "contact_number": "1241222232" 
    }, 
    "old": { 
    "contact_number": "1241222" 
    }, 
    "commit": true, 
    "@version": "1", 
    "type": "update", 
    "table": "contact", 
    "ts": 1479124813 
} 

我的问题是,我怎么能只提取与动态DOCUMENT_TYPE数据键elasticsearch实现这一

{ 
    "_index": "test_kafka", 
    "_type": "contact", 
    "_id": "AVhitY804rvpX8qdVt9d", 
    "_score": 1, 
    "_source": { 
    "contact_country_code": null, 
    "contact_type_id": 1, 
    "created": "2014-10-03 12:24:36", 
    "modified_by": null, 
    "modified": "2014-10-03 12:24:36", 
    "contact_id": 1, 
    "is_default": 0, 
    "created_by": null, 
    "contact_number": "1241222232" 
    } 
} 

回答

1

您可以添加一个ruby过滤器来按摩您的活动,如下所示。它的作用是首先将table字段保存在@metadata字段中,以便您可以在elasticsearch输出中引用它。然后删除除data之外的所有字段。然后它复制根级别data字段中的所有字段,最后删除data字段。

input { 
    kafka { 
     topics =>["maxwell"] 
     codec => json 
    } 
} 
filter { 
    mutate { 
    add_field => { "[@metadata][type]" => "%{table}" } 
    } 
    ruby { 
    code => " 
     # Ruby code for Logstash 2.x 
     event.to_hash.delete_if {|k, v| k != 'data'} 
     event.to_hash.update(event['data'].to_hash) 
     event.to_hash.delete_if {|k, v| k == 'data'} 

     # Ruby code for Logstash 5.x 
     event.to_hash.delete_if {|k, v| k != 'data'}    
     event.to_hash.update(event.get('data').to_hash) 
     event.to_hash.delete_if {|k, v| k == 'data'} 
    " 
    }   
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { 
     hosts => 'localhost:9200' 
     index => 'test_kafka' 
     document_type => "%{[@metadata][type]}" 
    } 
} 
+0

的'event'没有发生变异,顺便说一句,我使用logstash 5.0 – scireon

+0

我也尝试使用此代码 ''' 事件= event.get( '数据')替换事件 ''' 但没有按预期工作 – scireon

+0

好的,的确,这是Logstash 2.x的ruby代码。让我为Logstash 5修复它。 – Val