在这种情况下,我认为Spark驱动程序会做的事情会影响结果,而不是编译器。 Spark是否可以优化执行流水线以避免产生冗余重复s
。我不确定,但我认为Spark会在内存中创建rdd1pairs
。
而是映射(String, String)
的,你可以使用(String, Unit)
:
rdd1.map(s => (s,()))
你在做什么基本上是基于rdd1
的rdd2
过滤器。如果rdd1明显小于rdd2,另一种方法是将rdd1
的数据表示为广播变量而不是RDD,并且只需过滤rdd2
即可。这样可以避免任何混洗或缩短阶段,因此可能会更快,但只有在rdd1
的数据足够小才能适合每个节点时才能工作。
编辑:
考虑如何使用单位,而不是字符串节省空间,可以考虑下面的例子:
object size extends App {
(1 to 1000000).map(i => ("foo"+i,()))
val input = readLine("prompt> ")
}
和
object size extends App {
(1 to 1000000).map(i => ("foo"+i, "foo"+i))
val input = readLine("prompt> ")
}
在这个问题How to check heap usage of a running JVM from the command line?描述使用jstat
命令第一个版本比后者使用的堆少得多。
编辑2:
Unit
实际上是没有内容的单独的对象,所以从逻辑上讲,它不应该要求任何序列化。类型定义包含Unit
的事实告诉你所有你需要能够反序列化一个具有Unit类型字段的结构。
Spark默认使用Java序列化。考虑以下几点:
object Main extends App {
import java.io.{ObjectOutputStream, FileOutputStream}
case class Foo (a: String, b:String)
case class Bar (a: String, b:String, c: Unit)
val str = "abcdef"
val foo = Foo("abcdef", "xyz")
val bar = Bar("abcdef", "xyz",())
val fos = new FileOutputStream("foo.obj")
val fo = new ObjectOutputStream(fos)
val bos = new FileOutputStream("bar.obj")
val bo = new ObjectOutputStream(bos)
fo writeObject foo
bo writeObject bar
}
这两个文件是相同的大小:
�� sr Main$Foo3�,�z \ L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
和
�� sr Main$Bar+a!N��b L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
听起来有道理,谢谢。但是,我仍然不确定与原始字符串变体相比,如何存储对Unit的引用可以节省大量的内存。可以? – Carsten
扩展了我的答案以覆盖该主题 – mattinbits
但在原始问题中没有创建新的字符串。对String的引用与对'()'的引用的大小相同。 –