2017-06-02 40 views
2

我移植了一些代码从星火1.6至2.1星火与下面的问题搏斗行数据帧:问题,以创建包含Option [T]

这星火1.6

import org.apache.spark.sql.types.{LongType, StructField, StructType} 

val schema = StructType(Seq(StructField("i", LongType,nullable=true)))  
val rows = sparkContext.parallelize(Seq(Row(Some(1L)))) 
sqlContext.createDataFrame(rows,schema).show 

完美地工作在星火2.1.1相同的代码:

import org.apache.spark.sql.types.{FloatType, LongType, StructField, StructType} 

val schema = StructType(Seq(StructField("i", LongType,nullable=true))) 
val rows = ss.sparkContext.parallelize(Seq(Row(Some(1L)))) 
ss.createDataFrame(rows,schema).show 

提供了以下运行时异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 72, i89203.sbb.ch, executor 9): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.Some is not a valid external type for schema of bigint 

那么,如果我想要可空Long而不是使用Option[Long],我应该如何将这些代码转换为Spark 2.x?

回答

1

其实是有一个JIRA SPARK-19056关于这个问题,它实际上不是一个。

所以这种行为是故意的。

允许 Option永远不会记录并带来了很多的烦恼,当我们应用编码器的结构到所有类型的操作。自Spark 2.0以来,请使用Dataset进行类型化操作/自定义对象。例如

val ds = Seq(1 -> None, 2 -> Some("str")).toDS 
ds.toDF // schema: <_1: int, _2: string> 
+0

感谢这个信息。我知道打算输入数据集它是最干净的解决方案,但也需要一些时间来重构代码 –

+0

@RaphaelRoth唯一的问题是,周围没有任何词汇。这是标准。 – eliasah

+0

我不太确定强类型数据集是否真的会成为新标准......我有些怀疑 –

1

的错误消息是清楚它说,当需要

scala.Some is not a valid external type for schema of bigint 

bigintSome用于所以你需要使用OptiongetOrElse结合,使我们可以定义nullOption回报nullpointer。下面的代码应该为你工作

val sc = ss.sparkContext 
val sqlContext = ss.sqlContext 
val schema = StructType(Seq(StructField("i", LongType,nullable=true))) 
val rows = sc.parallelize(Seq(Row(Option(1L) getOrElse(null)))) 
sqlContext.createDataFrame(rows,schema).show 

我希望这个答案是有帮助的

+0

感谢,这确实是我的情况 –

+0

伟大最简单的“解决办法”听到:) –