2013-03-21 74 views
0

我想将数据存储在Storm Spout发出的hdfs中。我已经在Bolt Class中添加了hadoop FS API代码,但它正在引发编译错误。如何在java中使用Storm Bolt中的Hadoop FS API

以下是风暴之锤类:

package bolts; 
import java.io.*; 
import java.util.*; 
import java.net.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 
import backtype.storm.topology.BasicOutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseBasicBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class DataNormalizer extends BaseBasicBolt { 

    public void execute(Tuple input, BasicOutputCollector collector) { 
    String sentence = input.getString(0); 
    String[] process = sentence.split(" "); 
    int n = 1; 
    String rec = ""; 
    try { 
     String filepath = "/root/data/top_output.csv"; 
     String dest = "hdfs://localhost:9000/user/root/nishu/top_output/top_output_1.csv"; 

     Configuration conf = new Configuration(); 
     FileSystem fileSystem = FileSystem.get(conf); 
     System.out.println(fileSystem); 
     Path srcPath = new Path(source); 
     Path dstPath = new Path(dest); 
     String filename = source.substring(source.lastIndexOf('/') + 1, 
       source.length()); 
     try { 
      if (!(fileSystem.exists(dstPath))) { 
       FSDataOutputStream out = fileSystem.create(dstPath, true); 
       InputStream in = new BufferedInputStream(
         new FileInputStream(new File(source))); 
       byte[] b = new byte[1024]; 
       int numBytes = 0; 
       while ((numBytes = in.read(b)) > 0) { 
        out.write(b, 0, numBytes); 
       } 
       in.close(); 
       out.close(); 

      } else { 
       fileSystem.copyFromLocalFile(srcPath, dstPath); 
      } 
     } catch (Exception e) { 
      System.err.println("Exception caught! :" + e); 
      System.exit(1); 
     } finally { 
      fileSystem.close(); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

}

我在CLASSPATH还增加了Hadoop的罐子.. 以下是classpath的值:

$STORM_HOME/storm-0.8.1.jar:$JAVA_HOME/lib/:$HADOOP_HOME/hadoop-core-1.0.4.jar:$HADOOP_HOME/lib/:$STORM_HOME/lib/ 

还复制hadoop库:hadoop-cor-1.0.4.jar,commons-collection-3.2.1.jarcommons-cli-1.2.jar in Storm/lib目录。

当我建立这个项目,它抛出以下错误:

3006 [Thread-16] ERROR backtype.storm.daemon.executor - 
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration 
     at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:37) 
     at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:34) 
     at org.apache.hadoop.security.UgiInstrumentation.create(UgiInstrumentation.java:51) 
     at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:216) 
     at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:184) 
     at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:236) 
     at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:466) 
     at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:452) 
     at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:1494) 
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1395) 
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) 
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123) 
     at bolts.DataNormalizer.execute(DataNormalizer.java:67) 
     at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:32) 
     ...................... 

回答

3

错误消息告诉你的Apache Commons Configuration也不见了。您必须将其添加到类路径中。

更一般地说,您应该将所有Hadoop依赖关系添加到您的类路径中。您可以使用依赖管理器(Maven,Ivy,Gradle等)查找它们,或者在安装了Hadoop的计算机上查看/usr/lib/hadoop/lib

+0

我已经在类路径中添加了所有Hadoop/lib jar。你可以检查上面给出的类路径.. – 2013-03-21 08:23:40

+0

':$ HADOOP_HOME/lib /:'不是有效的类路径条目。它应该是':$ HADOOP_HOME/lib/*:',参见[了解类路径通配符](http://docs.oracle.com/javase/6/docs/technotes/tools/solaris/classpath.html)。 – 2013-03-21 12:49:00