我有一个Spark Streaming应用程序,它正在读取来自Kafka中单一主题的数据,并根据元素的内容处理它,并将它插入到Cassandra中的两个不同的键空间中。一些数据可能会去KEYSPACE A,其他一些以KEYSPACE B.如果其他在Spark Streaming
我做目前使用的过滤器操作:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
所以过滤器在每个RDD应用,那些有租户领域的元素去密钥空间A和拥有租户字段B的密钥空间B转到密钥空间B.
有没有更有效的方法来做到这一点,而不是使用2次过滤操作(特别是因为以后可能会有2个以上的密钥空间)?在过滤器操作之前缓存rdd是否会提高性能?
我再说一遍,我有来自Kafka的DStream,我处理它,然后在“foreachRDD”操作中,我从上面的代码片段向Cassandra插入数据。
谢谢
感谢您的回答。 我应该在过滤器转换之后添加“rdd.unpersist(true)”,将其从内存中释放出来吗? –
你可以,但是如果你在方法里面有这段代码的话。然后,一旦您超出该方法,它会自动将其从内存中移除。 –
如果我没有记错的话,它也是非持久性的,它会将结果保存在驱动程序的内存中,并将其从工作人员内存中删除。另一方面摧毁,从各处去除它。 –