2014-10-27 81 views
4

我有一个是从一个MQTT经纪人接收数据的拓扑结构,我希望有一个喷口这样的表现:创建一个Apache风暴口发出的元组每X秒

  1. 发出批次元组(或每x秒一个字符串列表)。我如何实现这一目标?我读了一些关于Storm Trident的文章,但是它的IBatchSpout似乎不允许我在特定的时间间隔内批量发送元组。

  2. 如果没有新数据进来,喷嘴应该做什么?它不能阻止线程,因为它是Storm的主线程,对吧?

回答

2

您可以实现自己的MQTT喷口。举个例子,看看MongoSpout

重要的部分是nextTuple方法。

当这种方法被调用时,Storm正在请求Spout向输出采集器发出 元组。 这种方法应该是非阻塞的,所以 如果Spout没有元组发出,应该返回这个方法。 nextTuple,ack和fail都在喷口任务中的单个线程中紧密循环调用。当没有元组发出时,它很有礼貌地在下一次短暂的时间内进行三次睡眠(如单个毫秒),以免浪费太多的CPU。

你不能一次等待指定的时间,但你可以实现nextTuple,以便它偶尔发出一个元组。

private static final EMISSION_PERIOD = 2000; // 2 seconds 
private long lastEmission; 

@Override 
public void nextTuple() { 
    if (lastEmission == null || 
      lastEmission + EMISSION_PERIOD >= System.currentMillis()) { 
     List<Object> tuple = pollMQTT(); 
     if (tuple != null) { 
      this.collector.emit(tuple); 
      return; 
     } 
    } 
    Utils.sleep(50); 
} 

请注意,我找到了一个开源的MQTT spout。它看起来没有生产准备,但可以用它作为起点。

1

除了Christian,我发现this implementation是Storm的MQTT客户端。前面提到的链接还没有开发。