2016-03-15 42 views
0

我收到有几行,并在Java中使用该命令在星火流属性一个JSON输入:如何保持一个JSON结构,当我把它给星火流

JavaReceiverInputDStream <String> 
lines = ssc.socketTextStream 
(localhost, port) 

我现在想过滤线条流,以便它在每一行中只有两个特定的属性,并转储其余的。

这里的问题是,我注意到,行不留JSON结构,即,我不能只是做

JavaDstream<String> line=lines[1]; 
line.print() ; 

我的问题是,如何才能让我的JavaDStream保持结构的JSON对象,然后打印我想要的行?

我希望我很清楚,谢谢。

回答

1

曼努埃尔,

所以基本上你问怎么做才能确保整个有效载荷JSON是单个记录在RDD或者是什么,当你对套接字发送消息记录边界。

所以基本上你使用socketTextStream读取socket上的消息,如果它发现一个新的行字符,它将使用它作为记录边界。收听插座,阅读消息,并把它传递给星火实际的代码)方法SocketReceiver.bytesToLines的一部分(如果你看一下评论,这是它说什么

/** 
* This methods translates the data from an inputstream (say, from a socket) 
* to '\n' delimited strings and returns an iterator to access the strings. 
*/ 

所以一定要拿出\从您的发送火花单记录

苏尼尔

+0

亲爱的苏尼尔的JSON消息n字符, 谢谢您的回答。但它不完全是我想要的。我想要的是这样的: 我有一个JSON文件,我发送到Spark Streaming使用socketTextStream,称为'线'。如果我想打印整个'行'dstream,我只是做lines.print,它的工作原理。 但是,如果我想打印一个特定的属性值(例如第一行中的第一个属性的值,[1,1]行,我不能。 我该怎么做? –

+0

Basicaly我想要一个line对应于流中的一个RDD –

+0

如果您对将JSON文件转换为Stream感兴趣,您可能需要使用SparkStreamingContext.textFileStream(directoryToMonitor)的sparkStream概念,它的工作方式是可以监视特定HDFS目录中的文件当出现一个新的JSON文件时,Spark会将它转换为RDD,你可以在http://wpcertification.blogspot.com/2016/01/monitoring-hdfs-directory-for-new-files.html –