2016-08-16 44 views
0

运行暴雨卡夫卡凸出我想运行一个简单的项目如下图所示:无法在本地模式

public static void main(String [] args){ 
    TopologyBuilder builder = new TopologyBuilder(); 

    String kafkaTopic = "videoMonitor"; //only this topic encounter this problem 
    String spoutID = "VQMSpoutID"; 
    String zkRoot = "/VQMZkRoot"; 
    BrokerHosts brokerHosts = new ZkHosts("node1:2181,node2:2181,node3:2181"); 
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,kafkaTopic,zkRoot,spoutID); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new MyScheme()); 
    MyKafkaSpout myKafkaSpout = new MyKafkaSpout(spoutConfig); 
    builder.setSpout("my",myKafkaSpout); 
    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("topoName",new Config(),builder.createTopology()); 

} 

public class MyScheme implements Scheme { 

    @Override 
    public List<Object> deserialize(byte[] ser) { 
     System.out.println("------------------------------------------------------>" + ser); 
     ArrayList arrayList = new ArrayList(); 
     arrayList.add("nice"); 
     return arrayList; 
    } 

    @Override 
    public Fields getOutputFields() { 
     return new Fields("all"); 
    } 
} 

public class MyKafkaSpout extends KafkaSpout{ 

    public MyKafkaSpout(SpoutConfig spoutConf) { 
     super(spoutConf); 
    } 
} 

我试图读取使用kafkaSpout卡夫卡主题的消息。然而,该凸出得到了一些错误:

2016-08-16 15:00:12.565 [Thread-9-my] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka3:9092, 1=kafka4:9092, 2=kafka1:9092, 3=kafka2:9092, 4=kafka3:9092, 5=kafka4:9092, 6=kafka1:9092, 7=kafka2:9092}} 
2016-08-16 15:00:12.566 [Thread-9-my] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=kafka3:9092, partition=0}, Partition{host=kafka4:9092, partition=1}, Partition{host=kafka1:9092, partition=2}, Partition{host=kafka2:9092, partition=3}, Partition{host=kafka3:9092, partition=4}, Partition{host=kafka4:9092, partition=5}, Partition{host=kafka1:9092, partition=6}, Partition{host=kafka2:9092, partition=7}] 
2016-08-16 15:00:12.567 [Thread-9-my] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] 
2016-08-16 15:00:12.567 [Thread-9-my] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=kafka3:9092, partition=4}, Partition{host=kafka1:9092, partition=6}, Partition{host=kafka4:9092, partition=5}, Partition{host=kafka2:9092, partition=7}, Partition{host=kafka3:9092, partition=0}, Partition{host=kafka1:9092, partition=2}, Partition{host=kafka4:9092, partition=1}, Partition{host=kafka2:9092, partition=3}] 
2016-08-16 15:00:12.641 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000c type:exists cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:/VQMZkRoot/VQMSpoutID/partition_4 
2016-08-16 15:00:12.641 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000c type:exists cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:/VQMZkRoot/VQMSpoutID/partition_4 
2016-08-16 15:00:12.641 [Thread-9-my-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000c, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,33,-101 request:: '/VQMZkRoot/VQMSpoutID/partition_4,F response:: 
2016-08-16 15:00:12.642 [Thread-9-my] INFO storm.kafka.PartitionManager - Read partition information from: /VQMZkRoot/VQMSpoutID/partition_4 --> null 
2016-08-16 15:00:12.669 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092 
2016-08-16 15:00:12.720 [Thread-9-my] DEBUG kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 1048576 (requested 1048576), SO_SNDBUF = 65536 (requested -1), connectTimeoutMs = 10000. 
2016-08-16 15:00:12.768 [Thread-9-my] INFO kafka.consumer.SimpleConsumer - Reconnect due to error: 
java.io.IOException: 远程主机强迫关闭了一个现有的连接。(In english means: Remote host closed an existing connection) 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_91] 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_91] 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_91] 
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_91] 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_91] 
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:206) ~[na:1.8.0_91] 
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.8.0_91] 
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.8.0_91] 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.0.0.jar:na] 
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.11-0.9.0.1.jar:na] 
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.11-0.9.0.1.jar:na] 
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [kafka_2.11-0.9.0.1.jar:na] 
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.11-0.9.0.1.jar:na] 
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [kafka_2.11-0.9.0.1.jar:na] 
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [kafka_2.11-0.9.0.1.jar:na] 
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) [storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) [storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) [storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-0.9.3.jar:0.9.3] 
    at backtype.storm.daemon.executor$fn__3371$fn__3386$fn__3415.invoke(executor.clj:565) [storm-core-0.9.5.jar:0.9.5] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) [storm-core-0.9.5.jar:0.9.5] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] 
2016-08-16 15:00:12.768 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092 
2016-08-16 15:00:12.770 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092 
2016-08-16 15:00:12.809 [Thread-9-my] DEBUG kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 1048576 (requested 1048576), SO_SNDBUF = 65536 (requested -1), connectTimeoutMs = 10000. 
2016-08-16 15:00:12.845 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092 
2016-08-16 15:00:12.847 [Thread-9-my] ERROR backtype.storm.util - Async loop died! 
java.lang.RuntimeException: java.io.IOException: 远程主机强迫关闭了一个现有的连接。(In english means: Remote host closed an existing connection) 
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3] 
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3] 
    at backtype.storm.daemon.executor$fn__3371$fn__3386$fn__3415.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] 

omited some... 

2016-08-16 15:00:12.849 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x22 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my 
2016-08-16 15:00:12.849 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x22 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my 
2016-08-16 15:00:12.849 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 34,3 replyHeader:: 34,33,-101 request:: '/storm/errors/topoName-1-1471330807/my,F response:: 
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x23 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807 
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x23 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807 
2016-08-16 15:00:12.850 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 35,3 replyHeader:: 35,33,-101 request:: '/storm/errors/topoName-1-1471330807,F response:: 
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x24 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors 
2016-08-16 15:00:12.851 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x24 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors 
2016-08-16 15:00:12.851 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 36,3 replyHeader:: 36,33,0 request:: '/storm/errors,F response:: s{9,9,1471330807639,1471330807639,0,0,0,0,1,0,9} 
2016-08-16 15:00:12.854 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x25 zxid:0x22 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.854 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x25 zxid:0x22 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.855 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 37,1 replyHeader:: 37,34,0 request:: '/storm/errors/topoName-1-1471330807,#7,v{s{31,s{'world,'anyone}}},0 response:: '/storm/errors/topoName-1-1471330807 
2016-08-16 15:00:12.857 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x26 zxid:0x23 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.857 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x26 zxid:0x23 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.857 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 38,1 replyHeader:: 38,35,0 request:: '/storm/errors/topoName-1-1471330807/my,#7,v{s{31,s{'world,'anyone}}},0 response:: '/storm/errors/topoName-1-1471330807/my 
2016-08-16 15:00:12.860 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x27 zxid:0x24 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.860 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x27 zxid:0x24 txntype:1 reqpath:n/a 
2016-08-16 15:00:12.861 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 39,1 replyHeader:: 39,36,0 request:: '/storm/errors/topoName-1-1471330807/my/e,#ffffffacffffffed05737201f636c6f6a7572652e6c616e672e50657273697374656e7441727261794d6170ffffffd02836ffffff8f21ffffffe4ffffffa0f2024c055f6d6574617401d4c636c6f6a7572652f6c616e672f4950657273697374656e744d61703b5b056172726179740135b4c6a6176612f6c616e672f4f626a6563743b787201b636c6f6...omit some,v{s{31,s{'world,'anyone}}},2 response:: '/storm/errors/topoName-1-1471330807/my/e0000000000 
2016-08-16 15:00:12.865 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:getChildren2 cxid:0x28 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my 
2016-08-16 15:00:12.865 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:getChildren2 cxid:0x28 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my 
2016-08-16 15:00:12.866 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 40,12 replyHeader:: 40,36,0 request:: '/storm/errors/topoName-1-1471330807/my,F response:: v{'e0000000000},s{35,35,1471330812855,1471330812855,0,1,0,0,1,1,36} 
2016-08-16 15:00:12.867 [Thread-9-my] ERROR backtype.storm.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5] 
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 
    at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:493) [storm-core-0.9.5.jar:0.9.5] 
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] 

Process finished with exit code 1 

我相信我已正确设置主机和更奇怪的是,我可以运行这个凸出如果我把它编译成JAR文件,并将其部署到Linux服务器上。

我也怀疑,如果它的主题有一些问题,因为当我换一个新话题所有的代码运行正常。

有没有人可以帮忙?谢谢!

+0

我已通过将主题更改为新主题来解决此问题。目前,这个问题已经成为为什么老话题不能被阅读以及如果需要如何解决的问题。 –

回答

0

我会检查的卡夫卡或饲养员>> ./zkCli.sh >> LS /经纪人/主题创建与否的主题。 我有连接错误(类似于套接字连接错误),对于我的情况,主机是不正确的。通过cd/etc/hosts检查主机列表,并确保IP地址和主机名(localhost)配置正确,而不是其他主机名。您可以验证在zookeeper cli >> get/brokers/ids /(broker id no)&检查plaitext。它应该匹配。

正如你所说,其他主题的工作。这意味着你的spoutConfig参数必定有错误。我没有看到其他东西。不过,如果有其他事情,我很乐意去学习。