2017-06-06 78 views
0

我尝试流数据写入到与Apache的nifi.putElasticSearch处理器elasticsearch,Apache Nifi PutElasticsearch可以永远等待填满批量大小?

PutElasticSearch有一个名为“批量大小”,当我这个值设置为1的所有事件都写入尽快elasticsearch财产。

但是如此低的“批量大小”在负载高时显然不起作用。因此,为了有一个合理的吞吐量,我需要将它设置为1000.

我的问题是,PutElasticSearch是否等待可用事件的批量大小。如果是,则在处理器上等待999个事件时可以等待数小时。

我在寻找理解logstash如何在elasticsearch输出插件上做同样的工作。可能会有一些基于时间实施的冲洗逻辑(如果事件正在等待~2秒的冲洗事件,以弹性搜索)..

你有什么想法?


编辑:我刚刚发现logstash实现这个https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-idle_flush_time :)

我怎样才能做到在nifi

回答

3

相同的功能根据代码的batch size参数是从传入FlowFiles的最大数量队列。

例如在值batch size = 1000情况下:

1 /如果传入队列1001个包含流文件 - 只有1000将在一个事务中作出。

2 /如果传入队列包含999个流文件 - 999将在一个事务中进行。

一旦进入队列中有东西,并且在nifi中有可用的线程,一切都将被处理。

引用:

PutElasticsearch.java

ProcessSession.java