2016-06-08 54 views
1

我想构造一个接收解析器作为参数的类,并在每一行使用这个解析器。以下是您可以粘贴到spark-shell的最简单示例。通用解析器类`任务不可序列化'

import scala.util.{Success,Failure,Try} 
import scala.reflect.ClassTag 

class Reader[T : ClassTag](makeParser:() => (String => Try[T])) { 

    def read(): Seq[T] = { 

    val rdd = sc.parallelize(Seq("1","2","oops","4")) mapPartitions { lines => 

     // Since making a parser can be expensive, we want to make only one per partition. 
     val parser: String => Try[T] = makeParser() 

     lines flatMap { line => 
     parser(line) match { 
      case Success(record) => Some(record) 
      case Failure(_) => None 
     } 
     } 
    } 

    rdd.collect() 
    } 
} 

class IntParser extends (String => Try[Int]) with Serializable { 
    // There could be an expensive setup operation here... 
    def apply(s: String): Try[Int] = Try { s.toInt } 
} 

然而,当我尝试运行像new Reader(() => new IntParser).read()(哪种类型的检查就好了),我得到了可怕的错误org.apache.spark.SparkException: Task not serializable有关关闭。

为什么会出现错误,并且有没有办法重新设计上述以避免这种情况(同时保留Reader通用)?

+0

奇怪。该函数只关闭'makeParser',但是'()=>新的IntParser'应该是可序列化的。如果用'parser'作为参数替换传递的'makeParser'会发生什么? –

+0

@AlexeyRomanov如果我只是把'parser'作为'Reader [T]'的参数,我仍然会得到相同的消息(稍微不同的跟踪,但仍然与闭包有关) – Alec

+0

@Alec - 快速修复移动行解析器:String =>在val rdd =之前尝试[T] = makeParser().. – Knight71

回答

2

问题是,makeParser是可变的class Reader,因为你在rdd转换中使用它,spark会尝试序列化整个不可序列化的类Reader。所以你会得到任务而不是可序列化的异常。

将Serializable添加到类Reader中将与您的代码一起工作。但是这不是一个好的做法,因为它会序列化可能不需要的整个类变量。

一般而言,您可以使用函数而不是方法来避免序列化问题。因为在scala中函数实际上是对象,它将被序列化。

参考这样的回答: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

+1

备用修正: 1.使'makeParser'成为'read'而不是'Reader'的参数。 2.更改'read'以将'makeParser'函数存储在局部变量中:'val makeParser0 = makeParser; ... val parser:String =>尝试[T] = makeParser0()...'。 –

+0

啊!第一段解释它 - 现在一切都合情合理。 @AlexeyRomanov我可能会最终做到这一点! – Alec

相关问题