2016-03-15 99 views
0

我每5分钟从ElasticSearch读取数据到Spark。所以每5分钟会有一个RDD。如何从持续的RDD构建DStream?

我希望能够基于这些RDD构建一个DStream,以便我可以在最近1天,最近1小时,最近5分钟等内获取数据报告。

为了构建DStream,我一直在考虑创建自己的接收器,但spark的官方文档只能使用scala或java来提供信息。我使用python。

那么你知道有什么办法吗?我知道我们可以。毕竟DStream是一系列RDD,当然我们应该从持续的RDD创建DStream。我只是不知道如何。请给出一些建议

回答

1

写你自己的接收器将是你提到的一种方式,但看起来像很多开销。你可以做的是使用QueueReceiver,它创建QueueInputDStreamthis example一样。这是斯卡拉,但你也应该能够做类似的事情在Python:

val rddQueue = new Queue[RDD[Map[String, Any]]]() 
val inputStream = ssc.queueStream(rddQueue) 

之后,您只需要查询您的ES实例的每个X sec/min/h/day/whatever,你把结果放到该队列。

使用Python我想这将是这样的:

rddQueue = [] 
rddQueue += es_rdd() // method that returns an RDD from ES 
inputStream = ssc.queueStream(rddQueue) 

// some kind of loop that adds to rddQueue new RDDS 

显然,你需要有东西在排队,你在pyspark使用它里面queueStream(或至少我得到异常之前,如果它是空)。

+0

谢谢@Mateusz Dymczyk。 queueStream将不起作用,因为在创建DStream之后。任何添加到队列中的新rdd都不会计入 –

+0

@KramerLi啊你是对的,我想知道是否有某种方法可以改变这种行为... –