15

从spark 2.0.1开始我有一些问题。我看了很多文件,但至今未能找到足够的答案:Spark 2.0数据集vs DataFrame

  • 是什么
    • df.select("foo")
    • df.select($"foo")
  • 之间的区别做我理解正确的是
    • myDataSet.map(foo.someVal)是typesafe和wi将不会转换为RDD,但保留在DataSet表示中/无额外的开销(2.0.0的性能)
  • 所有其他命令选择,..只是语法糖。它们不是类型安全的,可以使用地图代替。没有map语句,我怎么能df.select("foo")类型安全?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图停留在数据集表示中)?
+0

有一个项目,旨在为星火更多类型的安全而有效的执行路径上停留:[typelevel /无框](https://github.com/typelevel/frameless) –

回答

11
  1. df.select("foo")df.select($"foo")之间的区别是签名。前者至少需要一个String,后者一个零或多个Columns。除此之外没有实际的区别。
  2. myDataSet.map(foo.someVal)类型检查,但作为任何Dataset操作使用RDD的对象,并与DataFrame操作相比,有一个很大的开销。让我们来看看一个简单的例子:

    case class FooBar(foo: Int, bar: String) 
    val ds = Seq(FooBar(1, "x")).toDS 
    ds.map(_.foo).explain 
    
    == Physical Plan == 
    *SerializeFromObject [input[0, int, true] AS value#123] 
    +- *MapElements <function1>, obj#122: int 
        +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar 
         +- LocalTableScan [foo#117, bar#118] 
    

    正如你可以看到这个执行计划需要访问到所有领域,并有DeserializeToObject

  3. 否。一般而言,其他方法不是句法糖并生成显着不同的执行计划。例如:

    ds.select($"foo").explain 
    
    == Physical Plan == 
    LocalTableScan [foo#117] 
    

    相比,才可以直接访问列中显示的计划。它不是API的限制,而是操作语义差异的结果。

  4. 如何在没有map语句的情况下df.select(“foo”)类型安全?

    没有这样的选择。而类型化的列允许您以静态Dataset转换为另一种静态类型Dataset

    ds.select($"bar".as[Int]) 
    

    有没有类型安全的。还有一些尝试包括类型安全优化操作,like typed aggregations,但是这个实验性的API。

  5. 我为什么要使用UDF/UADF不是地图

    这完全取决于你的。 Spark中的每个分布式数据结构都有自己的优点和缺点(例如参见Spark UDAF with ArrayType as bufferSchema performance issues)。

就个人而言,我觉得静态类型Dataset是最无用的:

  • 不要提供相同的一系列优化为Dataset[Row](虽然它们共享存储格式和一些执行计划优化其没有充分受益于代码生成或堆外存储),也无法访问DataFrame的所有分析功能。

  • 类型转换是黑盒子,有效地为优化器创建分析障碍。例如选择(过滤器)不能被推过输入变换:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
    
    == Physical Plan == 
    *Filter (foo#133 = 1) 
    +- *Filter <function1>.apply 
        +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
         +- Exchange hashpartitioning(foo#133, 200) 
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
          +- LocalTableScan [foo#133, bar#134] 
    

    相比:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
    
    == Physical Plan == 
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
    +- Exchange hashpartitioning(foo#133, 200) 
        +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
         +- *Filter (foo#133 = 1) 
         +- LocalTableScan [foo#133, bar#134] 
    

    这会影响设有像谓词下推或投影下推。

  • 不像RDDs那样灵活,只有一小部分本地支持的类型。

  • 当使用as方法转换Dataset时,使用Encoders的“类型安全性”存在争议。由于数据形状不是使用签名编码的,因此编译器只能验证是否存在Encoder

相关问题: