2016-11-15 26 views
4

我知道使用.withColumn()UDF向Spark DataSet添加新列的方法,它返回一个DataFrame。我也知道,我们可以将生成的DataFrame转换为DataSet。如何将数据列添加到数据集而不从DataFrame转换并访问它?

我的问题是:

  1. 如何DataSet的类型安全进场这里,如果我们依然遵循着传统的DF方法(即通过列名作为UDF的输入字符串)
  2. 是否有“像面向对象的方式”访问列(不需要像字符串那样传递列名),就像我们以前用RDD做的那样,用于追加一个新列。
  3. 如何访问像地图,过滤器等正常操作中的新列?

例如:

scala> case class Temp(a : Int, b : String) //creating case class 
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS 
    scala> val appendUDF = udf((b : String) => b + "ing")  // sample UDF 

    scala> df.withColumn("c",df("b")) // adding a new column 
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field] 

    scala> res5.as[Temp] // converting to DS 
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field] 

    scala> res6.map(x =>x. 
    // list of autosuggestion : 
    a canEqual equals  productArity  productIterator toString 
    b copy  hashCode productElement productPrefix 

新列c,即我已经使用.withColumn()是无法访问的添加,由于柱c不在的情况下类Temp(它仅包含a & b)在使用res5.as[Temp]将其转换为DS时。

如何访问列c

回答

6

在类型安全的世界Dataset s中,您可以将结构映射到另一个结构中。

也就是说,对于每次转换,我们都需要数据的模式表示(因为RDD需要它)。要访问上面的'c',我们需要创建一个新的模式来提供对它的访问。

case class A(a:String) 
case class BC(b:String, c:String) 
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC 

val data = (1 to 10).map(i => A(i.toString)) 
val dsa = spark.createDataset(data) 
// dsa: org.apache.spark.sql.Dataset[A] = [a: string] 

val dsb = dsa.map(f) 
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string] 
+0

有没有其他的方式来添加一列而不传递字符串? – vdep

+0

@vdep''字符串'只是一个遵循问题脉络的例子。 – maasg

+0

不,我的意思是,我们可以做到这一点,而不需要在这里传递列名'b'作为字符串:'df.withColumn(“c”,df(“b”))' – vdep

3

只需添加到@ maasg的出色答卷......

如何DataSet的类型安全进场这里,如果我们依然遵循着传统的DF方法(即通过列名作为字符串对于UDF的输入)

让我用另一个问题回答这个问题:“我们谁在'我们还在...'”?如果你认为我,我不同意并且只是在我懒得创建一个案例类来描述要使用的数据集时才使用DataFrames。

我对UDF的回答是远离UDF,除非它们非常简单,并且Spark Optimizer无法优化。是的,我确实相信UDF很容易定义和使用,我自己被太多的时间用来(超过)使用它们。 Spark SQL 2.0中提供了大约239个函数,您可以认真思考一个没有UDF但标准函数的解决方案。

scala> spark.version 
res0: String = 2.1.0-SNAPSHOT 

scala> spark.catalog.listFunctions.count 
res1: Long = 240 

(240以上是因为我注册了一个UDF)。

您应该始终使用标准功能,因为它们可以进行优化。 Spark可以控制你在做什么,从而优化你的查询。

您还应该使用数据集(而不是Dataset[Row]DataFrame),因为它们使您可以访问字段的类型安全访问。因为数据集编程都是关于Scala自定义代码,Spark无法像基于DataFrame的代码那样优化,所以还是无法优化一些数据集“好东西”。

是否存在像面向RDD那样访问列的“面向对象的方式”(不像列名称那样传递字符串),用于追加新列。

是的。当然。用例类定义数据集的模式并使用该字段。无论是访问还是添加(这就是@maasg很好的回应,所以我不会在这里重复他的话)。

如何访问正常操作中的新列如地图,过滤器等?

简单...再次。使用描述数据集(的模式)的案例类。你如何添加一个新的“东西”到现有的对象?你不能不知道已经接受了一个新的专栏,不是吗?

在访问列或附加新列的“”面向对象的方式“。如果您的列是案例类的属性,则不能说“这是描述数据的类,同时说这是一个可能具有新属性的类”。这在OOP/FP中是不可能的,是吗?

这就是为什么添加一个新列可归结为使用另一个案例类或使用withColumn。那有什么问题?我认为......简单地......没有任何问题。

相关问题