2017-06-22 50 views
1

我们对具有13种不同ETL操作的系统使用Scala的Spark 2.x。其中7个相对简单,每个都由一个域类驱动,主要区别在于这个类和处理负载的一些细微差别。Spark/Scala,数据集和案例类的多态性

负载类的简化版本被如下,对于本例的目的说,有被加载7点比萨饼的浇头,这里的辣:

object LoadPepperoni { 
    def apply(inputFile: Dataset[Row], 
      historicalData: Dataset[Pepperoni], 
      mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row => 
     PepperoniRaw(
      weight = row.getAs[String]("weight"), 
      cost = row.getAs[String]("cost") 
     ) 
    }.toDS() 

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data 

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data 

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw => 
     Pepperoni(value = ???, key1 = ???, key2 = ???) 
    }.toDS() 

    val joinedData = dedupedData.joinWith(historicalData, 
     historicalData.col("key1") === dedupedData.col("key1") && 
     historicalData.col("key2") === dedupedData.col("key2"), 
     "right_outer" 
    ) 

    joinedData.map { case (hist, delta) => 
     if(/* some condition */) { 
     hist.copy(value = /* some transformation */) 
     } 
    }.flatMap(list => list).toDS() 
    } 
} 

换句话说类执行一系列对数据进行操作时,操作大部分是相同的,并且总是按照相同的顺序进行,但每个顶点可能略有不同,从“原始”到“域”和合并函数的映射也会如此。

要做到这一点为7浇头(即蘑菇,奶酪等),我宁愿不简单地复制/粘贴类和更改所有的名称,因为结构和逻辑是所有负载共同。相反,我宁愿定义一个通用的“装载”类的泛型类型,像这样:

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D): Dataset[D] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row => 
... 

对于每一个特定类的操作,例如:从“生”到“域”映射,或合并,有特征或抽象类来实现细节。这将是一个典型的依赖注入/多态模式。

但我遇到了一些问题。从Spark 2.x开始,编码器仅提供给本地类型和案例类别,并且没有办法一般地将类别标识为案例类别。因此,当使用泛型类型时,推断的toDS()和其他隐式功能不可用。

同样如this related question of mine中所述,使用泛型时,case类copy方法也不可用。我已经研究过Scala和Haskell常见的其他设计模式,例如类型类或ad-hoc多态,但障碍是Spark数据集基本上只处理不能抽象定义的案例类。

看来这将是Spark系统中的一个常见问题,但我无法找到解决方案。任何帮助赞赏。

回答

2

,使.toDS的隐式转换:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T] 

(从https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits

你是完全正确的,有计划的涵盖范围为Encoder[T]没有隐含的价值现在你已经取得了你的申请方法通用的,所以这种转换不会发生。但是你可以简单地接受一个作为隐式参数!

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = { 
... 

然后当时你呼叫负载,具有特定的类型,它应该是能够找到这种类型的编码器。请注意,您在调用上下文中也必须使用import sparkSession.implicits._

编辑:类似的方法是通过限制类型(apply[R, D <: Product])并接受隐含的JavaUniverse.TypeTag[D]作为参数来启用隐式的newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]

+0

谢谢你,是一种心灵鼓风机让我试试看。我遇到了与aggregateByKey()和copy()类似的错误(请参阅我的文章中的相关问题链接),是否有任何类似的魔法可以将所需的实现纳入范围? –

+0

更一般地说,这种方法是追踪缺少的特定隐含,并将其作为完全指定类型的调用站点的一个参数。在aggregateByKey的情况下,它看起来像你需要'ClassTag [D]'(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。如果我有任何好的想法,我会分开看看你的关于“copy”的问题,并在那里回复 –