最终目标:连接Elasticsearch和kafka,并将ES索引中正在进行的更改事件接收到kafka。从卡夫卡,我有听众做进一步处理。ElasticSearch到Kafka事件 - 每次更改时使用Logstash
方法:我使用Logstash输入和输出插件。这是配置代码。
input {
elasticsearch {
hosts => ["localhost:9200"]
index => "liferay-20116"
}
}
output {
kafka {
topic_id => "elastic-topic-index"
codec => json
}
}
它正在工作,但有一个奇怪的问题。
当我收听kafka时,它会从ES读取所有文档,大约176个文档。
一旦它读取,它停止一段时间说2秒,然后再读取整个176文档!
我不知道是什么问题,这是由于Logstash行为还是卡夫卡行事怪异?
任何帮助,将不胜感激!
你要发送,如果匹配查询收到的文件? 在这种情况下,看看https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-percolate-query.html –