2017-10-05 63 views
0

最终目标:连接Elasticsearch和kafka,并将ES索引中正在进行的更改事件接收到kafka。从卡夫卡,我有听众做进一步处理。ElasticSearch到Kafka事件 - 每次更改时使用Logstash

方法:我使用Logstash输入和输出插件。这是配置代码。

input { 
     elasticsearch { 
       hosts => ["localhost:9200"] 
       index => "liferay-20116" 
     } 
} 
output { 
     kafka { 
     topic_id => "elastic-topic-index" 
     codec => json 
     } 
} 

它正在工作,但有一个奇怪的问题。

当我收听kafka时,它会从ES读取所有文档,大约176个文档。

一旦它读取,它停止一段时间说2秒,然后再读取整个176文档!

我不知道是什么问题,这是由于Logstash行为还是卡夫卡行事怪异?

任何帮助,将不胜感激!

+0

你要发送,如果匹配查询收到的文件? 在这种情况下,看看https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-percolate-query.html –

回答

0

这是这个插件的标准行为 - 它将数据匹配到给定的查询。如果您只想更改文档,唯一的解决方法是建立关于自己改变内容的知识 - 例如,您需要有条目时间戳,然后将这些知识并入发送给ES的查询中。

+0

所以你的意思是它的ElasticSearch输入插件是推动数据,即每176 doc每隔2秒钟向kafka说?我认为这应该发生在ES有任何变化的情况下,而不是每2秒左右。 –

+0

我不知道为什么插件每2秒推一次,但插件只是执行查询 - 它不知道ES中发生了什么变化。 –

+0

谢谢。我其实已经明白了。这不是肯定会工作。在同一篇博客文章上撰写并分享:) –