我有一个是从一个MQTT经纪人接收数据的拓扑结构,我希望有一个喷口这样的表现:创建一个Apache风暴口发出的元组每X秒
发出批次元组(或每x秒一个字符串列表)。我如何实现这一目标?我读了一些关于Storm Trident的文章,但是它的
IBatchSpout
似乎不允许我在特定的时间间隔内批量发送元组。如果没有新数据进来,喷嘴应该做什么?它不能阻止线程,因为它是Storm的主线程,对吧?
我有一个是从一个MQTT经纪人接收数据的拓扑结构,我希望有一个喷口这样的表现:创建一个Apache风暴口发出的元组每X秒
发出批次元组(或每x秒一个字符串列表)。我如何实现这一目标?我读了一些关于Storm Trident的文章,但是它的IBatchSpout
似乎不允许我在特定的时间间隔内批量发送元组。
如果没有新数据进来,喷嘴应该做什么?它不能阻止线程,因为它是Storm的主线程,对吧?
您可以实现自己的MQTT喷口。举个例子,看看MongoSpout。
重要的部分是nextTuple
方法。
当这种方法被调用时,Storm正在请求Spout向输出采集器发出 元组。 这种方法应该是非阻塞的,所以 如果Spout没有元组发出,应该返回这个方法。 nextTuple,ack和fail都在喷口任务中的单个线程中紧密循环调用。当没有元组发出时,它很有礼貌地在下一次短暂的时间内进行三次睡眠(如单个毫秒),以免浪费太多的CPU。
你不能一次等待指定的时间,但你可以实现nextTuple
,以便它偶尔发出一个元组。
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
请注意,我找到了一个开源的MQTT spout。它看起来没有生产准备,但可以用它作为起点。
除了Christian,我发现this implementation是Storm的MQTT客户端。前面提到的链接还没有开发。