在本地集群中运行拓扑之后,我创建了一个远程风暴集群(storm-deploy Nathan)。在使用“包依赖”创建可运行jar之前,我已经从eclipse中的构建路径中删除了Storm jars。我的拓扑结构使用storm-kafka-0.9.0-wip16a-scala292.jar,我在构建路径时将其留在构建路径中,并在创建可运行jar之前从构建路径中删除(仅尝试解决此问题..)。当我使用下面的命令:storm-deploy提交拓扑结构java.lang.NoClassDefFoundError
./storm jar /home/ubuntu/Virtual/stormTopologia4.jar org.vicomtech.main.StormTopologia
它总是回复:
Exception in thread "main" java.lang.NoClassDefFoundError: OpaqueTridentKafkaSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2451)
at java.lang.Class.getMethod0(Class.java:2694)
at java.lang.Class.getMethod(Class.java:1622)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: OpaqueTridentKafkaSpout
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
由于这种拓扑在单个实例上运行良好的AWS作为运行的JAR,我不能图什么i'm在这里失去了... 是代码我的主要方法中:
Config conf = new Config();
OpaqueTridentKafkaSpout tridentSpout = crearSpout(
kafkadir, "test");
OpaqueTridentKafkaSpout logUpvSpout = crearSpout(kafkadir,
"logsUpv");
OpaqueTridentKafkaSpout logSnortSpout = crearSpout(
kafkadir, "logsSnort");
try {
StormSubmitter.submitTopology(
"hackaton",
conf,
buildTopology(tridentSpout, logUpvSpout,
logSnortSpout));
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TwitterException e) {
e.printStackTrace();
}
}
private static OpaqueTridentKafkaSpout crearSpout(
String testKafkaBrokerHost, String topic) {
KafkaConfig.ZkHosts hosts = new ZkHosts(testKafkaBrokerHost, "/brokers");
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
config.forceStartOffsetTime(-2);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(config);
}
public static StormTopology buildTopology(OpaqueTridentKafkaSpout tridentSpout,
OpaqueTridentKafkaSpout logUpvSpout,
OpaqueTridentKafkaSpout logSnortSpout
) throws IOException,
TwitterException {
TridentTopology topology = new TridentTopology();
topology.newStream("tweets2", tridentSpout)
.each(new Fields("str"), new OnlyEnglishSpanish())
.each(new Fields("str"), new WholeTweetToMongo())
.each(new Fields("str"), new TextLangExtracter(),
new Fields("text", "lang")).parallelismHint(6)
.project(new Fields("text", "lang"))
.partitionBy(new Fields("lang"))
.each(new Fields("text", "lang"), new Analisis(),
new Fields("result")).parallelismHint(6)
.each(new Fields("result"), new ResultToMongo());
return topology.build();
}
有没有什么办法可以让OpaqueTridentKafkaSpout可用? 预先感谢您
希望it's不是一个愚蠢的cuestion,我非常新手到这个领域
谢谢@Svend,我会试试你的解决方案!顺便说一下,你在这些领域有很棒的教程,非常有用! – user3410473
谢谢,很高兴你喜欢他们:) – Svend