0

我有一个Spark Streaming应用程序,它正在读取来自Kafka中单一主题的数据,并根据元素的内容处理它,并将它插入到Cassandra中的两个不同的键空间中。一些数据可能会去KEYSPACE A,其他一些以KEYSPACE B.如果其他在Spark Streaming

我做目前使用的过滤器操作:

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName") 
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName") 

所以过滤器在每个RDD应用,那些有租户领域的元素去密钥空间A和拥有租户字段B的密钥空间B转到密钥空间B.

有没有更有效的方法来做到这一点,而不是使用2次过滤操作(特别是因为以后可能会有2个以上的密钥空间)?在过滤器操作之前缓存rdd是否会提高性能?

我再说一遍,我有来自Kafka的DStream,我处理它,然后在“foreachRDD”操作中,我从上面的代码片段向Cassandra插入数据。

谢谢

回答

0

你做

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName") 
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName") 

之前,请务必做rdd.cache()

当你在做类似上面,你的火花正试图两次读取数据RDD。 除非您缓存或广播它,否则Spark永远不会保留内存中的任何rdd。

如果数据集不是很大,另一种方法是一次读取所有数据并缓存它。然后使用groupByKey,在这种情况下,key将是您的keyspace(element)。

+0

感谢您的回答。 我应该在过滤器转换之后添加“rdd.unpersist(true)”,将其从内存中释放出来吗? –

+0

你可以,但是如果你在方法里面有这段代码的话。然后,一旦您超出该方法,它会自动将其从内存中移除。 –

+0

如果我没有记错的话,它也是非持久性的,它会将结果保存在驱动程序的内存中,并将其从工作人员内存中删除。另一方面摧毁,从各处去除它。 –