2017-06-16 73 views
2

我有一个与Spark JavaStreamingContext一起使用的程序。我已经了解到,使用DStreams时只有几个输出操作,如print()。 这是一段代码在JavaSparkStreamingContext中执行查询

private static void analyzeHashtags() throws InterruptedException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

} 

现在我想查询操作添加到这个代码,如下图所示:

private static void analyzeHashtags() throws InterruptedException, SQLException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    String hashtag = "#dummy"; int frequencies = 59; 
    String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})"; 
    st.executeUpdate(cql); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

但这代码只是执行查询一次。我希望它在每次循环时执行它。 怎么可能做到这一点?提前致谢。

回答

2

要对DStream执行任意操作,我们使用foreachRDD。它在每个批处理间隔提供对数据的访问,由基础rdd表示。

的Java/Scala的伪(混合)代码:

JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new 
Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
lines.foreachRDD{ rdd => 
    .. do something with the RDD here... 
} 

通常,do something操作上的RDD的数据。 我们可以通过使用RDD函数(例如foreachPartition)以分布式方式对该数据进行操作。

但是,考虑到您在本地使用本地neo4j连接,并且如果每个流式传输间隔的数据不是很大,我们可以将数据收集到驱动程序并在本地执行操作。看来,这将是在这种情况下一个合适的,因为数据已经通过已经分布式简化阶段(reduceBykey

因此,foreachRDD部分将变成:

lines.foreachRDD{ rdd => 
    val localDataCollection = rdd.collect 
    localDataCollection.foreach{ keywordFreqPair => 
     val cql = "CREATE (n:Hashtag {name:'"+keywordFreqPair._1+"', freq:"+keywordFreqPair._2+"})" 
     st.executeUpdate(cql) 
} 
+0

感谢完整和有用的答案。 我只是不知道如何在Java(idk Scala)中实现'foreachRDD'部分。使用lambda表达式,我应该写'lines.foreachRDD(rdd - >(...'用函数代替点吗? – sirdan

+1

我个人推荐使用Scala和Spark Streaming。对于'foreachRDD' lambda的Java转换,我想你可以在Spark Streaming示例包中找到一个例子,例如:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg

+0

非常感谢,这有助于很多 – sirdan