2017-06-05 109 views
1

RDD减这么想的工作,我想这个简单的例子用户定义类型

scala> rdd2.collect 
    res45: Array[Person] = Array(Person(Mary,28,New York), Person(Bill,17,Philadelphia), Person(Craig,34,Philadelphia), Person(Leah,26,Rochester)) 

    scala> rdd3.collect 
    res44: Array[Person] = Array(Person(Mary,28,New York), Person(Bill,17,Philadelphia), Person(Craig,35,Philadelphia), Person(Leah,26,Rochester)) 

    scala> rdd2.subtract(rdd3).collect 
    res46: Array[Person] = Array(Person(Mary,28,New York), Person(Leah,26,Rochester), Person(Bill,17,Philadelphia), Person(Craig,34,Philadelphia)) 

我希望rdd2.subtract(rdd3).collect只应该是Person(Craig,34,Philadelphia)但我得到RDD2作为我的输出任何人都可以请解释一下吗?

回答

0

这是Scala的REPL其中平等条件不REPL中正常工作的已知问题之一。尝试以下来解决它。此问题仅在REPL中发生,并且在您通过spark-submit运行应用程序时会消失。 此问题在此ticket中有详细说明。

scala> :paste -raw // make sure you are using Scala 2.11 for the raw option to work. 
// Entering paste mode (ctrl-D to finish) 

package mytest; 
case class Person(name: String, age: Int, city: String); 

// Exiting paste mode, now interpreting. 

scala> import mytest.Person 

scala> val rdd2 = sc.parallelize(Seq(Person("Mary",28,"New York"), Person("Bill",17,"Philadelphia"), Person("Craig",34,"Philadelphia"), Person("Leah",26,"Rochester"))) 
rdd2: org.apache.spark.rdd.RDD[mytest.Person] = ParallelCollectionRDD[6] at parallelize at <console>:25 


scala> val rdd3 = sc.parallelize(Seq(Person("Mary",28,"New York"), Person("Bill",17,"Philadelphia"), Person("Craig",35,"Philadelphia"), Person("Leah",26,"Rochester"))) 
rdd3: org.apache.spark.rdd.RDD[mytest.Person] = ParallelCollectionRDD[7] at parallelize at <console>:25 

scala> rdd2.subtract(rdd3).collect 
res1: Array[mytest.Person] = Array(Person(Craig,34,Philadelphia)) 
+0

谢谢你的回答,它在我的情况下效果很好。但如何使用粘贴模式可以解决问题? –

+0

解决方案需要将案例类置于一个包中(本例中为mytest)。要做到这一点,我们需要粘贴模式。与原始选项。 –

-1

这里是我在Spark 2.0.2中测试并按预期工作的示例,我认为它应该适用于您的情况。试试这个例子,尝试在本地而不是在REPL上运行。

case class Person(name: String, age: Int, address: String) 

    val spark = 
     SparkSession.builder().master("local").appName("test").getOrCreate() 

    val df1 = spark.sparkContext.parallelize(Array(
     Person("Mary",28,"New York"), 
     Person("Bill",17,"Philadelphia"), 
     Person("Craig",34,"Philadelphia"), 
     Person("Leah",26,"Rochester"))) 

    val df2 = spark.sparkContext.parallelize(Array(
     Person("Mary",28,"New York"), 
     Person("Bill",17,"Philadelphia"), 
     Person("Craig",35,"Philadelphia"), 
     Person("Leah",26,"Rochester") 
    )) 

    df1.subtract(df2).collect.foreach(println) 

输出:

Person(Craig,34,Philadelphia) 
+0

谢谢你的回答我得到了点 –