3

对于自定义Estimator的transformSchema方法,我需要能够将输入数据框架构与案例类中定义的架构进行比较。通常这可以按如下所述执行,如Generate a Spark StructType/Schema from a case class。然而,错误的为空时:具有正确空性的案例类的Spark模式

的DF由spark.read.csv().as[MyClass]推断出真正的模式如:

root 
|-- CUSTOMER_ID: integer (nullable = false) 

和案件类:

case class MySchema(CUSTOMER_ID: Int) 

比较我使用:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType] 
    if (!rawSchema.equals(rawDf.schema)) 

不幸的是,这总是会产生false ,从case类手动推断出新的模式被设置为空的,以true(因为JA java.Integer实际上可能为空)

root 
|-- CUSTOMER_ID: integer (nullable = true) 

我怎样才能创建模式时指定nullable = false

回答

3

可以说,你在混合那些不属于同一空间的东西。 ML管道本质上是动态的,引入静态类型的对象并不会真正改变这种情况。一类

此外模式定义为:

case class MySchema(CUSTOMER_ID: Int) 

将具有不可为空CUSTOMER_IDscala.Int是不一样的java.lang.Integer

scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 

scala> case class MySchema(CUSTOMER_ID: Int) 
defined class MySchema 

scala> schemaFor[MySchema].dataType 
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false)) 

话虽这么说,如果你想nullable领域Option[Int]

case class MySchema(CUSTOMER_ID: Option[Int]) 

,如果你想不能为空使用Int如上。

您在这里遇到的另一个问题是,对于csv,每个字段根据定义可以为空,并且此状态由编码的Dataset“继承”。因此,在实践中:

spark.read.csv(...) 

总是会导致:

root 
|-- CUSTOMER_ID: integer (nullable = true) 

,这就是为什么你的架构不匹配。遗憾的是,不可能为字段覆盖不强制使用可空性限制的来源,如csvjson

如果具有不为空的模式是一个硬性要求,你可以尝试:

spark.createDataFrame(
    spark.read.csv(...).rdd, 
    schemaFor[MySchema].dataType.asInstanceOf[StructType] 
).as[MySchema] 

这种方法才有效,如果你知道数据实际上是null免费。任何null值wiil导致运行时异常。