2014-10-20 70 views
2

我是新来的火花,我正在使用Spark与Kafka流..Spark流缓存和转换

我的流媒体持续时间是1秒。

假设我得到100条记录中的第1批和120个记录在第二批80条记录第三批

--> {sec 1 1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80} 

我申请我的逻辑在第一批和有一个结果=> RESULT1

我想要在处理第二批时使用result1,并将第二批的result1和120记录的组合结果设为=> result2

我试图缓存结果,但无法在2s中获取缓存result1 有可能吗?或者在这里展示如何实现我的目标?

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, String.class,String.class, StringDecoder.class,StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

我处理消息并找到1秒结果的单词。

if(resultCp!=null){ 
       resultCp.print(); 
       result = resultCp.union(words.mapValues(new Sum())); 

      }else{ 
       result = words.mapValues(new Sum()); 
      } 

resultCp = result.cache(); 

当第二批的resultCp不应该是零,但这样在任何给定的时间,我有特定秒数据本身我想找到累积的结果则返回空值。做任何一个知道如何做到这一点..

我了解到,一旦火花流传输开始jssc.start()控制不再在我们的结尾它与火花。那么是否可以将第一批的结果发送到第二批来查找累计值?

任何帮助非常感谢。提前致谢。

回答

1

我认为你正在寻找updateStateByKey它创建一个新的DStream通过应用一个cummulative功能提供的DStream和一些状态。 从星火例子包这个例子涵盖了问题的情况下:

首先,你需要一个更新功能,是以新的价值观和先前已知值:

val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    val currentCount = values.sum 

    val previousCount = state.getOrElse(0) 

    Some(currentCount + previousCount) 
} 

该函数用于创建一个Dstream,可以从一个源码流中累积数据。就像这样:

// Create a NetworkInputDStream on target ip:port and count the 
// words in input stream of \n delimited test (eg. generated by 'nc') 
val lines = ssc.socketTextStream(args(0), args(1).toInt) 
val words = lines.flatMap(_.split(" ")) 
val wordDstream = words.map(x => (x, 1)) 

// Update the cumulative count using updateStateByKey 
// This will give a Dstream made of state (which is the cumulative count of the words) 
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 

来源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

+0

谢谢,我已经整理出来我自己,感谢您的时间:) – mithra 2014-10-20 11:34:22