2016-11-06 90 views
1

因此我有一个使用Spotify/kafka(在Windows 10上运行)运行的本地Docker Container。 在这个容器中有数据发布在一个名为“数据”的主题中。在Docker上运行的Zeppelin中使用Spark的卡夫卡

我可以在Eclipse中像这样运行的Java应用程序消耗的数据:

private String topicName; 
    private ConsumerConfig config; 

    public KafkaConsumer() throws Exception { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "test"); 
     props.put("client.id", this.getClass().getSimpleName()); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     props.put("partition.assignment.strategy", "range"); 

     this.config = new ConsumerConfig(props); 
     this.topicName = "data"; 
    } 

    public void run() { 
     ConsumerConnector connector = Consumer.createJavaConsumerConnector(config); 
     Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(ImmutableMap.of(topicName, 1)); 
     List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName); 
     ExecutorService executor = Executors.newFixedThreadPool(streams.size()); 

     for (final KafkaStream<byte[], byte[]> stream : streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 
        while (iterator.hasNext()) { 
         MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next(); 
         String m = new String(messageAndMetadata.message()); 

         System.out.println(m); 
        } 
       } 
      }); 
     } 
    } 

我现在想的是消耗在dylanmei /飞艇泊坞窗容器的同一主题。我想在一个齐柏林笔记本运行以下命令:

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "zookeeper.connect" -> "localhost:2181", 
    "group.id" -> "example", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "partition.assignment.strategy" -> "range", 
) 

val topics = Array("data") 
val stream = KafkaUtils.createDirectStream[String, String](
    sc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream.map(record => (record.key, record.value)) 

这是我从spark kafka integration guide

采取但是,所有我曾经得到的是一个连接被拒绝(当我插入了错误的端口,也会发生,所以告诉我什么都没有):

java.net.ConnectException: Connection refused 
at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at org.apache.thrift.transport.TSocket.open(TSocket.java:182) 
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51) 
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37) 
at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60) 
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) 
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) 
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:96) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:216) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:385) 
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:105) 
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:306) 
at org.apache.zeppelin.scheduler.Job.run(Job.java:176) 
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:329) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

希望有人能帮助我让它运行,我没有线索。

谢谢您的问候!

+0

我会做的第一件事就是登录到docker和telnet端口2181和9092,以确定进程是否正在侦听 – user2230605

+0

您是否曾尝试过使用'--net =“host”'?我认为默认是桥接,您需要在连接字符串中使用网桥适配器地址。 – Cooper6581

回答

0

我认为这应该是一个火花解释器问题。我有同样的问题,当我重新开始我的翻译一切顺利。你也可以检查你的解释器的配置。有时候,如果解释器配置不好,你会得到这个错误。