我们对具有13种不同ETL操作的系统使用Scala的Spark 2.x。其中7个相对简单,每个都由一个域类驱动,主要区别在于这个类和处理负载的一些细微差别。Spark/Scala,数据集和案例类的多态性
负载类的简化版本被如下,对于本例的目的说,有被加载7点比萨饼的浇头,这里的辣:
object LoadPepperoni {
def apply(inputFile: Dataset[Row],
historicalData: Dataset[Pepperoni],
mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
PepperoniRaw(
weight = row.getAs[String]("weight"),
cost = row.getAs[String]("cost")
)
}.toDS()
val validatedData: Dataset[PepperoniRaw] = ??? // validate the data
val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data
val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
Pepperoni(value = ???, key1 = ???, key2 = ???)
}.toDS()
val joinedData = dedupedData.joinWith(historicalData,
historicalData.col("key1") === dedupedData.col("key1") &&
historicalData.col("key2") === dedupedData.col("key2"),
"right_outer"
)
joinedData.map { case (hist, delta) =>
if(/* some condition */) {
hist.copy(value = /* some transformation */)
}
}.flatMap(list => list).toDS()
}
}
换句话说类执行一系列对数据进行操作时,操作大部分是相同的,并且总是按照相同的顺序进行,但每个顶点可能略有不同,从“原始”到“域”和合并函数的映射也会如此。
要做到这一点为7浇头(即蘑菇,奶酪等),我宁愿不简单地复制/粘贴类和更改所有的名称,因为结构和逻辑是所有负载共同。相反,我宁愿定义一个通用的“装载”类的泛型类型,像这样:
object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D): Dataset[D] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...
对于每一个特定类的操作,例如:从“生”到“域”映射,或合并,有特征或抽象类来实现细节。这将是一个典型的依赖注入/多态模式。
但我遇到了一些问题。从Spark 2.x开始,编码器仅提供给本地类型和案例类别,并且没有办法一般地将类别标识为案例类别。因此,当使用泛型类型时,推断的toDS()和其他隐式功能不可用。
同样如this related question of mine中所述,使用泛型时,case类copy
方法也不可用。我已经研究过Scala和Haskell常见的其他设计模式,例如类型类或ad-hoc多态,但障碍是Spark数据集基本上只处理不能抽象定义的案例类。
看来这将是Spark系统中的一个常见问题,但我无法找到解决方案。任何帮助赞赏。
谢谢你,是一种心灵鼓风机让我试试看。我遇到了与aggregateByKey()和copy()类似的错误(请参阅我的文章中的相关问题链接),是否有任何类似的魔法可以将所需的实现纳入范围? –
更一般地说,这种方法是追踪缺少的特定隐含,并将其作为完全指定类型的调用站点的一个参数。在aggregateByKey的情况下,它看起来像你需要'ClassTag [D]'(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。如果我有任何好的想法,我会分开看看你的关于“copy”的问题,并在那里回复 –