1
因此我有一个使用Spotify/kafka(在Windows 10上运行)运行的本地Docker Container。 在这个容器中有数据发布在一个名为“数据”的主题中。在Docker上运行的Zeppelin中使用Spark的卡夫卡
我可以在Eclipse中像这样运行的Java应用程序消耗的数据:
private String topicName;
private ConsumerConfig config;
public KafkaConsumer() throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test");
props.put("client.id", this.getClass().getSimpleName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("partition.assignment.strategy", "range");
this.config = new ConsumerConfig(props);
this.topicName = "data";
}
public void run() {
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(ImmutableMap.of(topicName, 1));
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
String m = new String(messageAndMetadata.message());
System.out.println(m);
}
}
});
}
}
我现在想的是消耗在dylanmei /飞艇泊坞窗容器的同一主题。我想在一个齐柏林笔记本运行以下命令:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "example",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"partition.assignment.strategy" -> "range",
)
val topics = Array("data")
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
这是我从spark kafka integration guide
采取但是,所有我曾经得到的是一个连接被拒绝(当我插入了错误的端口,也会发生,所以告诉我什么都没有):
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51)
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37)
at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:96)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:216)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:385)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:105)
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:306)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:329)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
希望有人能帮助我让它运行,我没有线索。
谢谢您的问候!
我会做的第一件事就是登录到docker和telnet端口2181和9092,以确定进程是否正在侦听 – user2230605
您是否曾尝试过使用'--net =“host”'?我认为默认是桥接,您需要在连接字符串中使用网桥适配器地址。 – Cooper6581