我正在测试简单的拓扑结构来检查卡夫卡喷嘴的性能。 它包含kafka喷口和螺栓acknoledge每个元组。 博尔特execute方法:卡夫卡喷嘴性能不佳
public void execute(Tuple input) {
collector.ack(input);
}
拓扑结构是这样的:
protected void configureTopology(TopologyBuilder topologyBuilder) {
configureKafkaCDRSpout(topologyBuilder);
configureKafkaSpoutBandwidthTesterBolt(topologyBuilder);
}
private void configureKafkaCDRSpout(TopologyBuilder builder) {
KafkaSpout kafkaSpout = new KafkaSpout(createKafkaCDRSpoutConfig());
int spoutCount = Integer.valueOf(topologyConfig.getProperty("kafka.cboss.cdr.spout.thread.count"));
builder.setSpout(KAFKA_CDR_SPOUT_ID, kafkaSpout, spoutCount)
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(KAFKA_CDR_SPOUT_NUM_TASKS)));
}
private SpoutConfig createKafkaCDRSpoutConfig() {
BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("kafka.zookeeper.broker.host"));
String topic = topologyConfig.getProperty("kafka.cboss.cdr.topic");
String zkRoot = topologyConfig.getProperty("kafka.cboss.cdr.zkRoot");
String consumerGroupId = topologyConfig.getProperty("kafka.cboss.cdr.consumerId");
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new CbossCdrScheme());
kafkaSpoutConfig.ignoreZkOffsets = true;
kafkaSpoutConfig.fetchSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.fetchSizeBytes"));
kafkaSpoutConfig.bufferSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.bufferSizeBytes"));
return kafkaSpoutConfig;
}
public void configureKafkaSpoutBandwidthTesterBolt(TopologyBuilder topologyBuilder) {
SimpleAckerBolt b = new SimpleAckerBolt();
topologyBuilder.setBolt(SPOUT_BANDWIDTH_TESTER_BOLT_ID, b, Integer.valueOf(topologyConfig.getProperty(CFG_SIMPLE_ACKER_BOLT_PARALLELISM)))
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(SPOUT_BANDWIDTH_TESTER_BOLT_NUM_TASKS)))
.localOrShuffleGrouping(KAFKA_CDR_SPOUT_ID);
}
其他拓扑结构设置:
topology.max.spout.pending=250
topology.executor.receive.buffer.size=1024
topology.executor.send.buffer.size=1024
topology.receiver.buffer.size=8
topology.transfer.buffer.size=1024
topology.acker.executors=1
我发动我的拓扑1名工人1卡夫卡脱粒机和1个简单阿克尔螺栓。 这就是我在风暴UI获得:
欧凯我在10分钟1.5kk元组。螺栓的冲击力约为0.5。所以我的逻辑很简单:如果我将喷嘴和螺栓平行提示 - 我会得到双倍的性能。 下一个测试是1个工人2卡夫卡喷口,2个简单的阿克尔螺栓和topology.acker.executors = 2。下面是结果:
所以,我得到增加parallelizm提示更糟更流畅。为什么它会发生?我如何每秒处理元组?实际上任何测试与喷口并行性暗示大于2表示比1喷口执行器更糟糕的结果。
我已经检查过:
1)这不是卡夫卡故障。主题有2个经纪人有20个分区。 4名工作人员的拓扑结构可获得x4性能。
2)这不是服务器故障。服务器有40个核心和32Gb RAM。运行拓扑时,它消耗大约1/8的CPU,而几乎没有RAM。
3)改变topology.max.spout.pending参数没有帮助。 4)增加Bolt或Acker并行性暗示甚至更不利于。
你只用一个worker运行两个测试,如果你添加了另一个worker,该怎么办?因此,与两名工人一起进行第二次测试。 – morganw09dev
谢谢你的回复,摩根。你说得对。越来越多的工人给我比例的结果。有2个工人2个喷嘴,每秒我的元组数增加一倍。但是这个测试的想法是确定1名工人的最佳表现。我能得到的最好的结果是每10分钟1,5kk元组,或每秒2500元组。我猜想在40核心,32GB内存和10Gb/s网络的节点上,我可以做得更好。 – f1sherox
1名员工,但40个核心并不合理。无论如何,每个工作人员都是单线程的,所以这意味着您的服务器有能力容纳40名员工。您现在可以在单个核心服务器上获得完全相同的性能。每个线程2500元/ s并不是很好,但也不算太差。 – C4stor