2014-11-03 89 views
4

我想用“storm-hdfs连接器0.1.3”将数据写入HDFS。 github URL:https://github.com/ptgoetz/storm-hdfs,我已经添加了这个依赖项到我的maven项目中。风暴hdfs连接器...试图使用风暴将数据写入hdfs

<dependency> 
     <groupId>com.github.ptgoetz</groupId> 
     <artifactId>storm-hdfs</artifactId> 
     <version>0.1.3-SNAPSHOT</version> 
     <scope>provided</scope> 
</dependency> 

storm-hdfs项目本身提供了将数据写入HDFS的示例拓扑。我只是修改它以匹配我的文件位置。该HdfsFileTopology是:

package my.company.app; 

import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 
import org.apache.storm.hdfs.bolt.HdfsBolt; 
import org.apache.storm.hdfs.bolt.AbstractHdfsBolt; 
import org.apache.storm.hdfs.bolt.SequenceFileBolt; 
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; 
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; 
import org.apache.storm.hdfs.bolt.format.FileNameFormat; 
import org.apache.storm.hdfs.bolt.format.RecordFormat; 
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; 
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; 
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units; 
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; 
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; 
import org.apache.storm.hdfs.bolt.sync.SyncPolicy; 
import org.apache.storm.hdfs.common.rotation.MoveFileAction; 
import org.yaml.snakeyaml.Yaml; 

import java.io.FileInputStream; 
import java.io.InputStream; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.UUID; 
import java.util.concurrent.ConcurrentHashMap; 

public class HdfsFileTopology { 
static final String SENTENCE_SPOUT_ID = "sentence-spout"; 
static final String BOLT_ID = "my-bolt"; 
static final String TOPOLOGY_NAME = "test-topology"; 

public static void main(String[] args) throws Exception { 
    Config config = new Config(); 
    config.setNumWorkers(1); 

    SentenceSpout spout = new SentenceSpout(); 

    // sync the filesystem after every 1k tuples 
    SyncPolicy syncPolicy = new CountSyncPolicy(1000); 

    // rotate files when they reach 5MB 
    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES); 

    FileNameFormat fileNameFormat = new DefaultFileNameFormat() 
      .withPath("https://stackoverflow.com/users/storm/") 
      .withExtension(".txt"); 



    // use "|" instead of "," for field delimiter 
    RecordFormat format = new DelimitedRecordFormat() 
      .withFieldDelimiter("|"); 

    Yaml yaml = new Yaml(); 
    InputStream in = new FileInputStream(args[1]); 
    Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in); 
    in.close(); 
    config.put("hdfs.config", yamlConf); 

    HdfsBolt bolt = new HdfsBolt() 
      .withConfigKey("hdfs.config") 
      .withFsUrl(args[0]) 
      .withFileNameFormat(fileNameFormat) 
      .withRecordFormat(format) 
      .withRotationPolicy(rotationPolicy) 
      .withSyncPolicy(syncPolicy) 
      .addRotationAction(new MoveFileAction().toDestination("/dest2/")); 

    TopologyBuilder builder = new TopologyBuilder(); 

    builder.setSpout(SENTENCE_SPOUT_ID, spout, 1); 
    // SentenceSpout --> MyBolt 
    builder.setBolt(BOLT_ID, bolt, 4) 
      .shuffleGrouping(SENTENCE_SPOUT_ID); 

    if (args.length == 2) { 
     LocalCluster cluster = new LocalCluster(); 

     cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); 
     waitForSeconds(120); 
     cluster.killTopology(TOPOLOGY_NAME); 
     cluster.shutdown(); 
     System.exit(0); 
    } else if (args.length == 3) { 
     StormSubmitter.submitTopology(args[0], config, builder.createTopology()); 
    } else{ 
     System.out.println("Usage: HdfsFileTopology [topology name] <yaml config file>"); 
    } 
} 

public static void waitForSeconds(int seconds) { 
    try { 
     Thread.sleep(seconds * 1000); 
    } catch (InterruptedException e) { 
    } 
} 

public static class SentenceSpout extends BaseRichSpout { 
    private ConcurrentHashMap<UUID, Values> pending; 
    private SpoutOutputCollector collector; 
    private String[] sentences = { 
      "my dog has fleas", 
      "i like cold beverages", 
      "the dog ate my homework", 
      "don't have a cow man", 
      "i don't think i like fleas" 
    }; 
    private int index = 0; 
    private int count = 0; 
    private long total = 0L; 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("sentence", "timestamp")); 
    } 

    public void open(Map config, TopologyContext context, 
        SpoutOutputCollector collector) { 
     this.collector = collector; 
     this.pending = new ConcurrentHashMap<UUID, Values>(); 
    } 

    public void nextTuple() { 
     Values values = new Values(sentences[index], System.currentTimeMillis()); 
     UUID msgId = UUID.randomUUID(); 
     this.pending.put(msgId, values); 
     this.collector.emit(values, msgId); 
     index++; 
     if (index >= sentences.length) { 
      index = 0; 
     } 
     count++; 
     total++; 
     if(count > 20000){ 
      count = 0; 
      System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); 
     } 
     Thread.yield(); 
    } 

    public void ack(Object msgId) { 
     this.pending.remove(msgId); 
    } 

    public void fail(Object msgId) { 
     System.out.println("**** RESENDING FAILED TUPLE"); 
     this.collector.emit(this.pending.get(msgId), msgId); 
    } 
} 

public static class MyBolt extends BaseRichBolt { 

    private HashMap<String, Long> counts = null; 
    private OutputCollector collector; 

    public void prepare(Map config, TopologyContext context, OutputCollector collector) { 
     this.counts = new HashMap<String, Long>(); 
     this.collector = collector; 
    } 

    public void execute(Tuple tuple) { 
     collector.ack(tuple); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // this bolt does not emit anything 
    } 

    @Override 
    public void cleanup() { 
    } 
} 
} 

我编译使用maven(组ID:my.company.app)项目和构建成功,但是当我提交的jar文件强攻它抛出错误。

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/storm/hdfs/bolt/format/FileNameFormat 
Caused by: java.lang.ClassNotFoundException: org.apache.storm.hdfs.bolt.format.FileNameFormat 

即使我已经包含了类,为什么它会抛出一个类没有找到的错误? 有关如何使用风暴将数据写入HDFS的帮助,我们将不胜感激。

按照要求pom.xml中

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.mycompany.app</groupId> 
    <artifactId>hdfs_example</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <packaging>jar</packaging> 

    <name>hdfs_example</name> 
    <url>http://maven.apache.org</url> 

    <properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 

    <repositories> 
     <repository> 
      <id>Codehaus</id> 
      <url>http://repository.codehaus.org</url> 
     </repository> 

     <repository> 
      <id>Codehaus.Snapshots</id> 
      <url>http://snapshots.repository.codehaus.org</url> 
      <snapshots><enabled>true</enabled></snapshots> 
     </repository> 

     <repository> 
      <id>github-releases</id> 
      <url>http://oss.sonatype.org/content/repositories/github-releases/</url> 
     </repository> 

     <repository> 
      <id>clojars.org</id> 
      <url>http://clojars.org/repo</url> 
     </repository> 
    </repositories> 

    <dependencies> 
    <dependency> 
     <groupId>org.testng</groupId> 
     <artifactId>testng</artifactId> 
     <version>6.8.5</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.mockito</groupId> 
     <artifactId>mockito-all</artifactId> 
     <version>1.9.0</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.easytesting</groupId> 
     <artifactId>fest-assert-core</artifactId> 
     <version>2.0M8</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.jmock</groupId> 
     <artifactId>jmock</artifactId> 
     <version>2.6.0</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-core</artifactId> 
    <version>0.9.2-incubating</version> 
     <scope>provided</scope> 
</dependency> 

    <dependency> 
     <groupId>commons-collections</groupId> 
     <artifactId>commons-collections</artifactId> 
     <version>3.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>com.google.guava</groupId> 
     <artifactId>guava</artifactId> 
     <version>15.0</version> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>3.8.1</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-client</artifactId> 
    <version>2.2.0</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
    </exclusions> 
    </dependency> 
    <dependency> 
     <groupId>com.github.ptgoetz</groupId> 
     <artifactId>storm-hdfs</artifactId> 
     <version>0.1.3-SNAPSHOT</version> 
      <scope>provided</scope> 
     </dependency> 

    </dependencies> 

    <build> 
    <sourceDirectory>src/main/java</sourceDirectory> 
    <testSourceDirectory>test/main/java</testSourceDirectory> 
    <resources> 
     <resource> 
     <directory>${basedir}/multilang</directory> 
     </resource> 
    </resources> 
    <plugins> 
    <plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-shade-plugin</artifactId> 
    <version>1.4</version> 
    <configuration> 
     <createDependencyReducedPom>true</createDependencyReducedPom> 
     <descriptorRefs> 
      <descriptorRef>jar-with-dependencies</descriptorRef> 
     </descriptorRefs> 
    </configuration> 
    <executions> 
     <execution> 
      <phase>package</phase> 
      <goals> 
       <goal>shade</goal> 
      </goals> 
      <configuration> 
       <transformers> 
        <transformer 
          implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> 
        <transformer 
          implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
         <mainClass></mainClass> 
        </transformer> 
       </transformers> 
      </configuration> 
     </execution> 
    </executions> 
    </plugin> 
    <!-- 
    <plugin> 
     <groupId>com.theoryinpractise</groupId> 
     <artifactId>clojure-maven-plugin</artifactId> 
     <version>1.3.12</version> 
     <extensions>true</extensions> 
     <configuration> 
      <sourceDirectories> 
      <sourceDirectory>src/clj</sourceDirectory> 
      </sourceDirectories> 
     </configuration> 
     <executions> 
      <execution> 
      <id>compile</id> 
      <phase>compile</phase> 
      <goals> 
       <goal>compile</goal> 
      </goals> 
      </execution> 
      <execution> 
      <id>test</id> 
      <phase>test</phase> 
      <goals> 
       <goal>test</goal> 
      </goals> 
      </execution> 
     </executions> 
     </plugin> 
    --> 
     <plugin> 
     <groupId>org.codehaus.mojo</groupId> 
     <artifactId>exec-maven-plugin</artifactId> 
     <version>1.2.1</version> 
     <executions> 
      <execution> 
      <goals> 
       <goal>exec</goal> 
      </goals> 
      </execution> 
     </executions> 
     <configuration> 
      <executable>java</executable> 
      <includeProjectDependencies>true</includeProjectDependencies> 
      <includePluginDependencies>false</includePluginDependencies> 
      <classpathScope>compile</classpathScope> 
      <mainClass>${storm.topology}</mainClass> 
     </configuration> 
     </plugin> 

     <plugin> 
     <groupId>org.apache.maven.plugins</groupId> 
     <artifactId>maven-compiler-plugin</artifactId> 
     <configuration> 
      <source>1.6</source> 
      <target>1.6</target> 
     </configuration> 
     </plugin> 

    </plugins> 
    </build> 
</project> 
+0

在我看来,这是一个包装问题。你有没有像泰勒建议的那样使用maven-shade插件?乍一看你的拓扑结构对我来说确实很好,但实际上这看起来像是一个pom.xml问题。 – 2014-11-06 03:15:22

+0

同意。你可以发布你的pom.xml吗? – 2014-11-06 03:29:40

+0

@Kit是的,我确实使用了maven-shade插件,它是pom.xml文件中的第一个插件。 – tinus91 2014-11-06 08:08:16

回答

0

我有同样的问题,并详细检查后的pom.xml我意识到,在风暴HDFS的依赖<version>0.1.3-SNAPSHOT</version>定义范围,因为包括我认为意味着我们不得不将jar添加到风暴罐中,maven在包装过程中不会这样做。

我改变了版本,以maven回购中可用的版本,并删除了强制maven下载jar的范围,并在构建期间将它包含在storm jar中。

这里是我参考的pom.xml(除去一些基本的细节):

的src /主/资源/ 假 核心的site.xml HDFS-site.xml中

<plugins> 
    <plugin> 
    <artifactId>maven-compiler-plugin</artifactId> 
    <version>3.2</version> 
    <configuration> 
     <source>1.7</source> 
     <target>1.7</target> 
    </configuration> 
    </plugin> 

    <plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-shade-plugin</artifactId> 
    <version>1.4</version> 
    <configuration> 
     <createDependencyReducedPom>true</createDependencyReducedPom> 
    </configuration> 
    <executions> 
     <execution> 
     <phase>package</phase> 
     <goals> 
      <goal>shade</goal> 
     </goals> 
     <configuration> 
      <transformers> 
      <transformer 
        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> 
      <transformer 
        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
       <mainClass>com.company.main</mainClass> 
      </transformer> 
      </transformers> 
     </configuration> 
     </execution> 
    </executions> 
    </plugin> 

</plugins> 

<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>3.8.1</version> 
    <scope>test</scope> 
</dependency> 

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-core</artifactId> 
    <version>0.9.2-incubating</version> 
    <!-- keep storm out of the jar-with-dependencies --> 
    <scope>provided</scope> 
</dependency> 

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka</artifactId> 
    <version>0.9.2-incubating</version> 
</dependency> 

<dependency> 
    <groupId>log4j</groupId> 
    <artifactId>log4j</artifactId> 
    <version>1.2.17</version> 
</dependency> 

<!-- Utilities --> 
<dependency> 
    <groupId>commons-collections</groupId> 
    <artifactId>commons-collections</artifactId> 
    <version>3.2.1</version> 
</dependency> 

<dependency> 
    <groupId>com.google.guava</groupId> 
    <artifactId>guava</artifactId> 
    <version>15.0</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.1.1</version> 
    <exclusions> 
    <exclusion> 
     <groupId>javax.jms</groupId> 
     <artifactId>jms</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>com.sun.jdmk</groupId> 
     <artifactId>jmxtools</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>com.sun.jmx</groupId> 
     <artifactId>jmxri</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-simple</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>org.apache.zookeeper</groupId> 
     <artifactId>zookeeper</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency> 

<!-- our cluster hadoop version --> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-client</artifactId> 
    <version>2.4.0</version> 
    <exclusions> 
    <exclusion> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-log4j12</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency> 

<!-- our cluster hadoop version --> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-hdfs</artifactId> 
    <version>2.4.0</version> 
    <exclusions> 
    <exclusion> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-log4j12</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency> 


<!-- apache hdfs-bolt related dependencies --> 
<dependency> 
    <groupId>com.github.ptgoetz</groupId> 
    <artifactId>storm-hdfs</artifactId> 
    <version>0.1.2</version> 
    <exclusions> 
    <exclusion> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-client</artifactId> 
    </exclusion> 
    <exclusion> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-hdfs</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency>