2017-04-24 92 views
2

我正在使用IntelliJ社区版与Scala插件和火花库。我仍然在学习Spark,并且正在使用Scala Worksheet。Spark Scala:任务不可序列化错误

我已经写了下面的代码删除标点符号的字符串:

def removePunctuation(text: String): String = { 
    val punctPattern = "[^a-zA-Z0-9\\s]".r 
    punctPattern.replaceAllIn(text, "").toLowerCase 
} 

然后我读的文本文件,并尝试删除标点:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation) 

这给了如下错误,任何帮助将不胜感激:

org.apache.spark.SparkException:任务不可序列化 at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(/home/ubuntu/src/main/scala/Test.sc:284) at org.apache.spark.util.ClosureCleaner $ .clean(/ home/ubuntu/src/main/scala /Test.sc:104) at org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(/home/ubuntu/src/main/scala/Test.sc:366) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(/ home/ubuntu/src /主页/ scala/Test.sc:365) at org.apache.spark.rdd.RDDOperationScope $ .withScope(/home/ubuntu/src/main/scala/Test.sc:147) at #worksheet#。#工作表#(/ home/ubuntu/src/main/scala/Test.sc:108) 导致:java.io.NotSerializableException:A $ A21 $ A $ A21 序列化堆栈: - 对象不可序列化(类:A $ A21 $ A $ A21,值:A $ A21 $ A $ A21 @ 62db3891) - 类(类:A $ A21 $ A $ A21 $$ anonfun $ words $ 1,名称:$ outer,类型:class A $ A21 $ A $ A21) - 对象(class A $ A21 $ A $ A21 $$ anonfun 40) 在org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala::46) 在组织在org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala $字$ 1,) 。 apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureClean在$ org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108) at org.apache.spark .SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope .scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at A $ A21 $ A $ A21.words $ lzycompute(Test.sc:27) at A $ A21 $ A $ A21.words(Te st.sc:27) at A $ A21 $ A $ A21.get $$ instance $$ words(Test.sc:27) at A $ A21 $ .main(Test.sc:73) at A $ A21 。主要(Test.sc) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner。Java的:22)

回答

2

由于@TGaweda表明,星火的SerializationDebugger是识别非常有帮助“的序列化路径从给定的对象有问题的对象处于领先地位。”堆栈跟踪中的“序列化堆栈”之前的所有美元符号表明您的方法的容器对象是问题所在。

虽然这是最简单的,只是拍Serializable你的容器类,我更愿意利用这样的事实Scala是一种函数式语言,并使用你的函数作为头等公民:

sc.textFile("/home/ubuntu/data.txt",4).map { text => 
    val punctPattern = "[^a-zA-Z0-9\\s]".r 
    punctPattern.replaceAllIn(text, "").toLowerCase 
} 

或者,如果你真的希望保持独立:

val removePunctuation: String => String = (text: String) => { 
    val punctPattern = "[^a-zA-Z0-9\\s]".r 
    punctPattern.replaceAllIn(text, "").toLowerCase 
} 
sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation) 

当然这些选项的工作,因为Regexis serializable,你应该确认。

在次要的但非常重要的笔记中,构建Regex是昂贵的,所以为了性能而将其排除在转换之外 - 可能使用broadcast

1

阅读堆栈跟踪,有:

$外,类型:A类$ A21 $ A $ A21

这是一个很好的提示。你的lambda是可序列化的,但你的类不是可序列化的。

当您创建lambda表达式时,则此表达式具有对外部类的引用。在你的情况下,外部类是不可序列化的,即没有实现Serializable或其中一个字段不是可串行化的实例

+0

顺便说一句。这可能是复制副本的副本..;)但是,我没有时间搜索最佳答案,将问题标记为重复。如果你找到了一些很好的解释,请给我打电话,并将问题标为重复 –

+0

@Tawawa:谢谢你的回答,但我正处于学习阶段,根本不理解。我在发布之前搜索了这个问题。但是,他们都没有深入解释所有这些意思以及如何解决它。如果你可以建议一个可能的解决方案,那么任何人在未来谁得到这个错误只会感谢你 – SumB

+0

@sumitb这就是为什么我发布这个答案:) –

1

正如T. Gaweda已经指出的,你很可能在一个不可序列化的类中定义函数。因为它是一个纯函数,即它不依赖于封闭类的任何上下文,所以我建议你将它放入一个伴随对象,该对象应该扩展为Serializable。这将是Scala的相当于Java静态方法:

object Helper extends Serializable { 
    def removePunctuation(text: String): String = { 
    val punctPattern = "[^a-zA-Z0-9\\s]".r 
    punctPattern.replaceAllIn(text, "").toLowerCase 
    } 
} 
相关问题