2016-03-22 98 views
0

鉴于SparkFlumeEvents流(或者说,任何DSTREAM)如何做一个映射到适当的模式,使流,可以保存到卡桑德拉与星火流架构

stream.saveToCassandra(keyspace,table) 

一个天真的尝试抱怨缺少的列。

是stream.map()给定对象(这看起来很麻烦)的最佳方法?

或者......

另一种方法似乎是使用stream.foreachRDD并以某种方式映射到数据帧。考虑到流方法支持直接存储到cassandra,这似乎也很麻烦。

那么正确的方法是什么?

回答

0

通过指定要插入的键空间,表名和列,使用spark cassandra连接器将流保存到Cassandra中。另一种方法是将数据映射到UDT并将其插入到数据库中。如果您只需要插入数据,我宁愿将列指定为最快的方式。从文档 例不完全一样的,但是你可以使用它的任何变种:

val wc = stream.flatMap(_.split("\\s+")) 
    .map(x => (x, 1)) 
    .reduceByKey(_ + _) 
    .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 
+0

做不过你的代码假设,即传入流式传输有正确的列 - 我已经看到了文档这种方式,但我的主要问题更多地围绕转换和映射模式的想法 - 例如,我可能需要以自定义方式反序列化流的字节或执行列映射。我想知道指定这些映射/解码器的最简洁的方法是什么? – ismisesisko