2016-12-01 63 views
0

我已经写下面的代码来连接火花流的kinesis,但没有收到数据。无法读取火花流连接中的数据Kinesis

VAL kinesisStream = KinesisUtils.createStream(SSC,APPNAME,streamName中,endpointUrl,regionName,InitialPositionInStream.LATEST,batchInterval,StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.print() // nothing getting printed here 

val data = kinesisStream.flatMap(byteArray => new String(byteArray)) 

data.foreachRDD { rdd =>   
     println("data==" + rdd.collect().length) // no data here too 
     rdd.collect()//.saveAsTextFile("file:///home/myHome/Code/sample/somedata.txt");   
    } 

我试图写入到S3和文件系统,它按文件夹写文件名,并在我看到的只有_SUCCESS文件是零字节。

顺便说一下,我也能写,以相同的室壁运动流,并从Java

什么是这里的问题读取数据。

+0

您是否找到了解决方案? – ArunDhaJ

回答

0

我得到了这个问题的解决方案。

代码可以从kinesis中提取数据。与数据一起,它还生成大量的零字节部分文件。因为其流式应用程序数据部分文件正在为给定时间间隔生成,因此如果数据在该时间间隔内不可用,则生成零字节文件。

添加检查以删除DF代码中的空白部分文件,以便DF只能写入包含数据的零件文件。

我们在这个变化之后开始获取数据。