0
我使用卡夫卡作为我的输入,并把它在elasticsearch(输出)(logstash)指数只有特定的elasticsearch从卡夫卡输入
input {
kafka {
topics =>["maxwell"]
codec => json
}
}
filter {
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => 'test_kafka'
document_type => "%{table}"
hosts => 'localhost:9200'
}
}
当这个运行数据,它输出以下JSON
{
"database": "my_db",
"xid": 88935,
"@timestamp": "2016-11-14T12:00:13.763Z",
"data": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
},
"old": {
"contact_number": "1241222"
},
"commit": true,
"@version": "1",
"type": "update",
"table": "contact",
"ts": 1479124813
}
我的问题是,我怎么能只提取与动态DOCUMENT_TYPE数据键elasticsearch实现这一
{
"_index": "test_kafka",
"_type": "contact",
"_id": "AVhitY804rvpX8qdVt9d",
"_score": 1,
"_source": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
}
}
的'event'没有发生变异,顺便说一句,我使用logstash 5.0 – scireon
我也尝试使用此代码 ''' 事件= event.get( '数据')替换事件 ''' 但没有按预期工作 – scireon
好的,的确,这是Logstash 2.x的ruby代码。让我为Logstash 5修复它。 – Val