2014-03-12 101 views
1

在本地集群中运行拓扑之后,我创建了一个远程风暴集群(storm-deploy Nathan)。在使用“包依赖”创建可运行jar之前,我已经从eclipse中的构建路径中删除了Storm jars。我的拓扑结构使用storm-kafka-0.9.0-wip16a-scala292.jar,我在构建路径时将其留在构建路径中,并在创建可运行jar之前从构建路径中删除(仅尝试解决此问题..)。当我使用下面的命令:storm-deploy提交拓扑结构java.lang.NoClassDefFoundError

./storm jar /home/ubuntu/Virtual/stormTopologia4.jar org.vicomtech.main.StormTopologia 

它总是回复:

Exception in thread "main" java.lang.NoClassDefFoundError: OpaqueTridentKafkaSpout 
    at java.lang.Class.getDeclaredMethods0(Native Method) 
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2451) 
    at java.lang.Class.getMethod0(Class.java:2694) 
    at java.lang.Class.getMethod(Class.java:1622) 
    at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) 
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) 
Caused by: java.lang.ClassNotFoundException: OpaqueTridentKafkaSpout 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:423) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:356) 

由于这种拓扑在单个实例上运行良好的AWS作为运行的JAR,我不能图什么i'm在这里失去了... 是代码我的主要方法中:

 Config conf = new Config(); 

     OpaqueTridentKafkaSpout tridentSpout = crearSpout(
       kafkadir, "test"); 


     OpaqueTridentKafkaSpout logUpvSpout = crearSpout(kafkadir, 
       "logsUpv"); 

     OpaqueTridentKafkaSpout logSnortSpout = crearSpout(
       kafkadir, "logsSnort"); 

     try { 
      StormSubmitter.submitTopology(
        "hackaton", 
        conf, 
        buildTopology(tridentSpout, logUpvSpout, 
          logSnortSpout)); 
     } catch (AlreadyAliveException | InvalidTopologyException e) { 

      e.printStackTrace(); 
     } 



    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (TwitterException e) { 
     e.printStackTrace(); 
    } 

} 

private static OpaqueTridentKafkaSpout crearSpout(
     String testKafkaBrokerHost, String topic) { 
    KafkaConfig.ZkHosts hosts = new ZkHosts(testKafkaBrokerHost, "/brokers"); 

    TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic); 
    config.forceStartOffsetTime(-2); 

    config.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    return new OpaqueTridentKafkaSpout(config); 
} 


public static StormTopology buildTopology(OpaqueTridentKafkaSpout tridentSpout, 
     OpaqueTridentKafkaSpout logUpvSpout, 
     OpaqueTridentKafkaSpout logSnortSpout 
     ) throws IOException, 
     TwitterException { 

    TridentTopology topology = new TridentTopology(); 



    topology.newStream("tweets2", tridentSpout) 
      .each(new Fields("str"), new OnlyEnglishSpanish()) 
      .each(new Fields("str"), new WholeTweetToMongo()) 
      .each(new Fields("str"), new TextLangExtracter(), 
        new Fields("text", "lang")).parallelismHint(6) 
      .project(new Fields("text", "lang")) 
      .partitionBy(new Fields("lang")) 
      .each(new Fields("text", "lang"), new Analisis(), 
        new Fields("result")).parallelismHint(6) 
      .each(new Fields("result"), new ResultToMongo()); 


    return topology.build(); 

} 

有没有什么办法可以让OpaqueTridentKafkaSpout可用? 预先感谢您

希望it's不是一个愚蠢的cuestion,我非常新手到这个领域

回答

1

我们可以保持在构建路径风暴罐子当你生成JAR-具有依赖性,我们只需要告诉行家不捆绑它,像这样(见“规定”的范围,这意味着罐子由运行时环境提供的,因此没有必要捆绑):但是

<dependency> 
    <groupId>storm</groupId> 
    <artifactId>storm</artifactId> 
    <version>0.9.0-rc2</version> 
    <scope>provided</scope> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

卡夫卡喷出必须包含在jar-with-dependencies中,所以它的maven声明如下所示:

<dependency> 
     <groupId>storm</groupId> 
     <artifactId>storm-kafka</artifactId> 
     <version>0.9.0-wip16a-scala292</version> 
</dependency> 

为了验证东西,您可以随时解压缩生成的jar,并手动检查必要的类是否存在/不存在,因为它们应该在部署到风暴之前。

+0

谢谢@Svend,我会试试你的解决方案!顺便说一下,你在这些领域有很棒的教程,非常有用! – user3410473

+0

谢谢,很高兴你喜欢他们:) – Svend