2016-04-27 139 views
1
HashMap<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", "localhost:9092"); 

String topics = "test4"; 
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" "))); 


JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
    StringDecoder.class, kafkaParams, topicsSet) 
    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { 
     @Override 
     public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { 
     rdd.saveAsTextFile("output"); 
     return rdd; 
     } 
    }).map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> kv) { 
     return kv._2(); 
     } 
    }); 
stream1.print(); 
jssc.start(); 
jssc.awaitTermination(); 

交叉检查主题“test4”中是否存在有效数据。使用Spark Streaming后无输出

enter image description here

我期待的是从卡夫卡集群流串,在控制台的console.No例外印刷,也没有输出。 我在这里失踪的任何东西?

+2

你是否在StreamingContext上调用了'.start()'和'.awaitTermination()'? https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java#L109 – ccheneson

+0

Yes.Had实际上错过了它。现在调用它。但仍然没有输出数据。 –

回答

4

您是否试图在之后生成主题中的数据?

默认情况下,直接流使用配置auto.offset.reset =最大,这意味着,当没有初始偏移量时,它会自动重置为最大偏移量,所以基本上,您将只能读取输入的新消息流应用程序启动后的主题。

1

由于ccheneson说,这可能是因为你缺少.start().awaitTermination()

或者它可能是因为transformations in Spark are lazy,这意味着你需要添加一个动作,以获得满意的结果。例如

stream1.print(); 

或者,它可能是因为map正在对执行人进行,所以输出将在执行程序的日志,而不是驾驶者的日志。

+0

是的。你是对的。使用打印更新代码并开始。但是我无法在文件中看到任何输出。交叉验证数据在主题中可用。 –

+0

我可以更清楚地说明你最后的陈述吗?我在本地运行它,这样的工作不会在驱动程序本身运行吗? –