2016-09-16 73 views
0

过去两天,我一直在绑定在我们的拓扑结构中实现KafkaSpout。这里 是一些重要的信息。风暴 - KafkaSpout未能打开()

这三个服务都在同一个实例上运行。卡夫卡的经纪人默认使用9092 端口,advertised.listeners设置为PLAINTEXT://localhost:9092。 Zookeeper使用默认的 客户端端口2181.而Storm Nimbus主机名也已设置为localhost。

定制卡夫卡生产者创建日志消息成功,而使用zkCli 动物园管理员剧本我已经看到了使用/经纪人路径时,分区及其他相关信息 正确保存。

但是,我在激活时不断收到错误,并在事后监视拓扑结构。 这里是风暴拓扑结构的源代码,我实现:

BrokerHosts hosts = new ZkHosts("127.0.0.1:2181"); 

SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
spoutConfig.zkServers = Arrays.asList("127.0.0.1"); 
spoutConfig.zkPort = 2181; 

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("bytes", kafkaSpout); 
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes"); 

StormTopology topology = builder.createTopology(); 

Config config = new Config(); 

StormSubmitter.submitTopology("topology", config, topology); 

但是,该错误消息我一直在执行bin/storm monitor <topology_name> -m bytes时得到如下:

Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found 
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223) 
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159) 
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36) 
at clojure.lang.RestFn.applyTo(RestFn.java:137) 
at org.apache.storm.command.monitor.main(Unknown Source) 

而通过检查日志(worker.log文件),我认为 KafkaSpout在open()方法上失败。

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy 
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2] 
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2] 
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2] 
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] 
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101] 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101] 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101] 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101] 
... 5 more 

有人能解释什么可能是在KafkaSpout对 open()方法失败的原因是什么?

我真的很感谢您的帮助!

+0

你正在使用什么版本的Kafka和Storm?也看看风暴卡夫卡的版本。你在使用HDP群集吗? –

回答