2016-03-01 129 views
0

我正在测试简单的拓扑结构来检查卡夫卡喷嘴的性能。 它包含kafka喷口和螺栓acknoledge每个元组。 博尔特execute方法:卡夫卡喷嘴性能不佳

public void execute(Tuple input) { 
    collector.ack(input); 
} 

拓扑结构是这样的:

protected void configureTopology(TopologyBuilder topologyBuilder) { 
    configureKafkaCDRSpout(topologyBuilder); 
    configureKafkaSpoutBandwidthTesterBolt(topologyBuilder); 
} 

private void configureKafkaCDRSpout(TopologyBuilder builder) { 
    KafkaSpout kafkaSpout = new KafkaSpout(createKafkaCDRSpoutConfig()); 
    int spoutCount = Integer.valueOf(topologyConfig.getProperty("kafka.cboss.cdr.spout.thread.count")); 
    builder.setSpout(KAFKA_CDR_SPOUT_ID, kafkaSpout, spoutCount) 
      .setNumTasks(Integer.valueOf(topologyConfig.getProperty(KAFKA_CDR_SPOUT_NUM_TASKS))); 
} 
private SpoutConfig createKafkaCDRSpoutConfig() { 
    BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("kafka.zookeeper.broker.host")); 
    String topic = topologyConfig.getProperty("kafka.cboss.cdr.topic"); 
    String zkRoot = topologyConfig.getProperty("kafka.cboss.cdr.zkRoot"); 
    String consumerGroupId = topologyConfig.getProperty("kafka.cboss.cdr.consumerId"); 
    SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId); 
    kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new CbossCdrScheme()); 
    kafkaSpoutConfig.ignoreZkOffsets = true; 
    kafkaSpoutConfig.fetchSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.fetchSizeBytes")); 
    kafkaSpoutConfig.bufferSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.bufferSizeBytes")); 
    return kafkaSpoutConfig; 
} 

public void configureKafkaSpoutBandwidthTesterBolt(TopologyBuilder topologyBuilder) { 
    SimpleAckerBolt b = new SimpleAckerBolt(); 
    topologyBuilder.setBolt(SPOUT_BANDWIDTH_TESTER_BOLT_ID, b, Integer.valueOf(topologyConfig.getProperty(CFG_SIMPLE_ACKER_BOLT_PARALLELISM))) 
      .setNumTasks(Integer.valueOf(topologyConfig.getProperty(SPOUT_BANDWIDTH_TESTER_BOLT_NUM_TASKS))) 
      .localOrShuffleGrouping(KAFKA_CDR_SPOUT_ID); 
} 

其他拓扑结构设置:

topology.max.spout.pending=250 
topology.executor.receive.buffer.size=1024 
topology.executor.send.buffer.size=1024 
topology.receiver.buffer.size=8 
topology.transfer.buffer.size=1024 
topology.acker.executors=1 

我发动我的拓扑1名工人1卡夫卡脱粒机和1个简单阿克尔螺栓。 这就是我在风暴UI获得: Perfomance test 1

欧凯我在10分钟1.5kk元组。螺栓的冲击力约为0.5。所以我的逻辑很简单:如果我将喷嘴和螺栓平行提示 - 我会得到双倍的性能。 下一个测试是1个工人2卡夫卡喷口,2个简单的阿克尔螺栓和topology.acker.executors = 2。下面是结果:

Perfomance test 2

所以,我得到增加parallelizm提示更糟更流畅。为什么它会发生?我如何每秒处理元组?实际上任何测试与喷口并行性暗示大于2表示比1喷口执行器更糟糕的结果。

我已经检查过:
1)这不是卡夫卡故障。主题有2个经纪人有20个分区。 4名工作人员的拓扑结构可获得x4性能。
2)这不是服务器故障。服务器有40个核心和32Gb RAM。运行拓扑时,它消耗大约1/8的CPU,而几乎没有RAM。
3)改变topology.max.spout.pending参数没有帮助。 4)增加Bolt或Acker并行性暗示甚至更不利于。

+0

你只用一个worker运行两个测试,如果你添加了另一个worker,该怎么办?因此,与两名工人一起进行第二次测试。 – morganw09dev

+0

谢谢你的回复,摩根。你说得对。越来越多的工人给我比例的结果。有2个工人2个喷嘴,每秒我的元组数增加一倍。但是这个测试的想法是确定1名工人的最佳表现。我能得到的最好的结果是每10分钟1,5kk元组,或每秒2500元组。我猜想在40核心,32GB内存和10Gb/s网络的节点上,我可以做得更好。 – f1sherox

+0

1名员工,但40个核心并不合理。无论如何,每个工作人员都是单线程的,所以这意味着您的服务器有能力容纳40名员工。您现在可以在单个核心服务器上获得完全相同的性能。每个线程2500元/ s并不是很好,但也不算太差。 – C4stor

回答

0

所以看起来好像你对一名工人的表现感到极限。你只是让一个工人做很多事情,而且不能处理所有事情。

在这一点上,如果你想进一步提高你的系统的性能,你有两种选择。

  1. 添加更多工人。
  2. 提高你一个工人的工作能力。

如果你不想增加更多的工人,那么剩下的就是配置你的一个工人。然后,您应该调查一个工作人员的配置,为其提供更多的内存,更多的CPU等。您应该查看Storm的default configuration options,看看调整某些配置值是否会提高性能。一些配置看起来更有可能比其他配置更有帮助:

worker.heap.memory.mb: 
worker.childopts: 
supervisor.childopts: 
supervisor.memory.capacity.mb: 
supervisor.cpu.capacity: 
+0

我试图增加工人和主管的堆大小,但仍然没有任何变化。我已更改的设置:supervisor.childopts,worker.childopts。此外,我试图改变这个设置:topology.worker.shared.thread.pool.size,topology.worker.receiver.thread.count。这里完整的配置http://pastebin.com/TEB6d3Ve – f1sherox

+0

我从来没有真正做过很多的风暴优化,所以我不能在这方面做的很多。但除非你有一个合理的理由,否则我会建议增加工人的数量以获得更好的表现。 – morganw09dev