2017-02-24 88 views
0

我正在使用Kafka和Spark结构化流式处理。我收到以下格式的卡巴消息。在Spark结构化流中处理二进制数据

{"deviceId":"001","sNo":1,"data":"aaaaa"} 
{"deviceId":"002","sNo":1,"data":"bbbbb"} 
{"deviceId":"001","sNo":2,"data":"ccccc"} 
{"deviceId":"002","sNo":2,"data":"ddddd"} 

我读它像下面。

Dataset<String> data = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", bootstrapServers) 
     .option(subscribeType, topics) 
     .load() 
     .selectExpr("CAST(value AS STRING)") 
     .as(Encoders.STRING()); 
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
     processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes()) 
);} 

private void processData(String deviceId,int SNo, byte[] data) 
{ 
    //How to check previous processed Dataset??? 
} 

以我JSON消息 “数据” 是字节[]的字符串形式。我有一个要求,我需要按照“sNo”的顺序处理给定“deviceId”的二进制“数据”。因此,对于“deviceId”=“001”,我必须处理“sNo”= 1,然后“sNo”= 2等二进制数据。如何在结构化流式传输中检查之前处理过的数据集的状态?任何样本或链接都会有很大的帮助。我是Spark的新手,请和我一起裸照。谢谢。

+0

你到目前为止尝试了什么? – Jan

+0

我已更新我的代码。请检查。我在做orderBy然后forEach来处理数据。我被卡在processData方法中,如何处理来自流式传输接收的数据集的当前和以前的数据。 – user7615505

回答