2015-09-28 67 views
1

我们目前在集群拓扑模式下使用Apache Storm 0.9.5来处理Amazon Kinesis记录(喷出)并将它们存储到Redshift数据仓库(螺栓)中。我们的Storm集群部署在AWS中,由1个nimbus + UI节点,1个zookeeper节点和3个supervisor + logviewer节点组成。我们的拓扑配置支持处理多的Kinesis流和每一个数据流,它包括:Apache Storm一次性处理

  • 一个室壁运动流口监听传入记录
  • 一个红移螺栓插入记录到数据仓库

拓扑:

final TopologyBuilder topologyBuilder = new TopologyBuilder(); 

// for every configured kinesis stream 
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts(); 
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) { 
    final String spoutId = kinesisStreamSpout.getSpoutId(); 
    topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout()); 

    // set the corresponding redshift bolt 
    final String streamName = kinesisStreamSpout.getStreamName(); 
    final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName); 
    topologyBuilder.setBolt(redshiftBolt.getId(), 
     redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId); 
} 

return topologyBuilder.createTopology(); 

该系统的一个bugbear一直无法保证一次只能处理输入消息导致具有相同商业密钥的多条记录插入到目标数据库中。为了解问题的严重程度,我们运行了一个受控测试,发现大约三分之一的输入记录已提交处理多次。

根据this thread(目前尚未得到答复),我们也考虑使用Trident来保证一次只能处理,但也得出结论认为系统内置幂原理更重要至少一次语义)而不是增加复杂性,降低性能并生成状态,因为这个other article建议。

我们现在正在寻求关于在支持群集的方式下在现有拓扑中实现幂等性的最佳方式的建议。到目前为止,我们倾向于引入一个可以通过元组消息ID键值的RedisBolt。有没有现成的模式来实现这个使用Apache Storm?

+0

你确定,你在Kinesis里面没有重复吗? 1/3值对我来说看起来非常高... –

+0

重复提交分析基于Amazon Kinesis spout组件从ShardId构造的唯一messageId:SequenceNumber值。换句话说,同一业务有效载荷的多个Kinesis记录将被系统分配一个不同的元组messageId。 –

+0

好吧,那么你的拓扑结构就会出现一些问题......你有很多失败的元组吗? –

回答

0

如果您不想使用Trident,您可能需要阅读以下关于“事务拓扑”的文章。这是Trident背后的概念,您仍然可以“手动”应用它。这似乎是你的用例很好的模式:https://storm.apache.org/documentation/Transactional-topologies.html

此外,我想补充一点,风暴(像任何其他系统,如Apache Flink [声明:我是Flink的提交者]和Apache Spark Streaming)只能确保一次处理该系统。如果数据被转发到外部系统,那么只有当且仅当外部系统可以支持幂等操作时才能实现一次。

相关问题