在一个批处理中,您有一个RDD包含所有状态的间隔为2秒。然后你可以单独处理这些状态。这里是简单的例子:
JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);
inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
@Override
public Void call(JavaRDD<Status> status, Time time) throws Exception {
List<Status> statuses=status.collect();
for(Status st:statuses){
System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());
//Process and store status somewhere
}
return null;
}});
ctx.start();
ctx.awaitTermination();
}
我希望我没有误解你的问题。
卓然
谢谢。如果我将状态单独存储在列表中,是否可以应用列表中的所有RDD转换或像reduceByKey(),countByValue这样的操作?虽然我是Scala的新手,但我需要在Scala中完成。 – Naren
我刚刚给你列出了一个例子,告诉你可以访问各个状态,但是如果你想使用spark来进一步处理它,你不应该收集状态列表。例如,您可以实现inputDStream.mapToPair函数,该函数将通过某些键返回状态,例如用户ID或任何你需要的。那么你可以减少BIOS密钥。不幸的是,我只有Scala的基本知识,不能给你举例,但是你可以在Java中做的所有事情,你也可以在Scala中做。 –
我认为可能是我可以将特定批次的状态存储在列表中,并使用parallelize()将该列表转换为RDD,以便我可以应用Spark转换和操作。 – Naren