2016-11-14 65 views
0

我正在使用Apache Zeppelin笔记本。所以火花基本上以交互模式运行。我不能在这里使用闭包变量,因为齐柏林飞船尝试序列化整个段落(更大的闭包)时抛出org.apache.spark.SparkException: Task not serializableSpark UDF如何将地图转换为列

因此,如果没有闭包方法,我唯一的选择是将地图作为列传递给UDF。

我从paried RDD收集了以下地图:

final val idxMap = idxMapRdd.collectAsMap 

这是火花改造这里的一个正在使用:

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = { 

    predictions.array.map(idxMap.getOrElse(_, "Other")) 
} 
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)} 

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

但随着lit(idxMap)声明我得到了以下错误:

java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap 

所以我尝试使用以下内容创建列:

val colmap = map(idxMapArr.map(lit _): _*)

但是,得到以下错误:

<console>:139: error: type mismatch; 
found : Iterable[org.apache.spark.sql.Column] 
required: Seq[org.apache.spark.sql.Column] 
     val colmap = map(idxMapArr.map(lit _): _*) 

封闭的方法(的完整性):

def predictionStrUDF2(idxMapArr: scala.collection.Map[Double,String]) = { 
    udf((predictions: WrappedArray[Double]) => labelStr(predictions, idxMapArr)) 
} 
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions"))) 

它编译,但后来当我做cvmlPredictionsStr.show我得到以下。我认为,这是由于飞艇

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 62 elided 
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS 
Serialization stack: 
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: [email protected]) 
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS) 
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f) 
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator) 
    - object (class $iw, [email protected]) 
    - field (class: $iw, name: $iw, type: class $iw) 

回答

1

问题标题大约是星火的UDF的互动性,但似乎真正的问题在这里是如何避免关闭序列化问题,一些交互环境展览。

从您的问题描述,这听起来像如果在你的笔记本电池的一个直接执行以下不工作:

val x = 5 
sc.parallelize(1 to 10).filter(_ > x).collect() 

这可能是因为X是的一类成员细胞对象;当lambda捕获x时,它会尝试序列化整个单元对象。单元格对象不可序列化,结果是一个杂乱的例外。使用包装器对象可以避免此问题。请注意,这可能是一种简单的方式来声明这个包装器(也许只是嵌套在大括号内就足够了)。

object Wrapper { 
    def f() { 
     val x = 5 
     sc.parallelize(1 to 10).filter(_ > x).collect() 
    } 
} 
Wrapper.f() 

解决此问题后,您可能仍然有疑问,但目前问题涉及太多不同的子主题。关闭序列化问题的另一种解释是可用的here

+0

你的第一个例子是在我的齐柏林笔记本电脑中工作没有任何问题。我不需要在那里使用Wrapper。 – nir