2015-06-28 69 views
5

我是Spark和Spark Streaming的新手。我正在研究Twitter流式传输数据。我的任务涉及独立处理每个Tweet,如计算每条推文中的单词数。从我读到的,每个输入批处理在Spark Streaming的RDD上形成。因此,如果我给出2秒的批处理间隔,那么新的RDD包含所有推文两秒钟,并且所应用的任何转换将应用于整整两秒的数据,并且在两秒钟内无法处理单个推文。我的理解是否正确?或者每条推文形成一个新的RDD?我有点困惑...Spark Streaming中的批量大小

回答

1

在一个批处理中,您有一个RDD包含所有状态的间隔为2秒。然后你可以单独处理这些状态。这里是简单的例子:

JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters); 

     inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){ 
      @Override 
      public Void call(JavaRDD<Status> status, Time time) throws Exception { 
       List<Status> statuses=status.collect(); 
       for(Status st:statuses){ 
        System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());      
       //Process and store status somewhere 
       } 
       return null; 
      }});   
    ctx.start(); 
     ctx.awaitTermination();  
} 

我希望我没有误解你的问题。

卓然

+0

谢谢。如果我将状态单独存储在列表中,是否可以应用列表中的所有RDD转换或像reduceByKey(),countByValue这样的操作?虽然我是Scala的新手,但我需要在Scala中完成。 – Naren

+0

我刚刚给你列出了一个例子,告诉你可以访问各个状态,但是如果你想使用spark来进一步处理它,你不应该收集状态列表。例如,您可以实现inputDStream.mapToPair函数,该函数将通过某些键返回状态,例如用户ID或任何你需要的。那么你可以减少BIOS密钥。不幸的是,我只有Scala的基本知识,不能给你举例,但是你可以在Java中做的所有事情,你也可以在Scala中做。 –

+0

我认为可能是我可以将特定批次的状态存储在列表中,并使用parallelize()将该列表转换为RDD,以便我可以应用Spark转换和操作。 – Naren