我在这里阅读这篇文章:https://spark.apache.org/docs/latest/programming-guide.html(请参阅将函数传递给Spark),但我的用例是使用类型化数据集与我的案例类。我试图使用单身对象来保存映射方法。我想知道如何打包我需要的功能来优化我的舞台的性能(将数据集从一种类型转换为另一种类型,然后写入实木复合地板)。当使用数据集,大型Java类和单例时,Spark传递函数
目前,阶段性步骤花费了大约300万行(〜1.5小时)的难以置信的长时间,大约880 MB数据输出到s3实木复合地板。
我在集群模式下运行,使用最少执行程序= 3,最大执行程序= 10,每个执行程序有4个内核,驱动程序内存8GB。
-
高层次的编码部分:
我映射一个案例类C1到另一个案例类C2。 C1和C2有大约16个字段,各种类型,如java.sql.Timestamp,Option [String] Option [Int],String,Int,BigInt。
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
为了从C1至C2映射,我需要一个非常大的java类Ĵ我是从https://github.com/drtimcooper/LatLongToTimezone复制的功能(静态方法)。
public class J {
public static String getValue((float) v) = ...
}
我已经在一个util类里面写了映射函数Util,它具有许多其他有用的函数,它们被映射函数调用。
=========
基本上我的码流是这样的:
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
// very large java class J that only contains static methods
public class J {
public static String getValue((float) v) = ...
...
}
object Util {
def m1(i: Int): Int = ...
def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = {
J.getValue(l.get, l2.get)
}
...
def convert_C1_to_C2(c1: C1): C2 = {
C2(
field1 = m1(c1.field1),
field2 = m2(c1.field2, c1.field3),
...
}
}
dataframe.as[C1].map(Util.convert_C1_to_C2)
.mode(SaveMode.Overwrite)
.parquet("s3a://s3Path")
有没有写这个更优化的方式吗?或者任何人都可以指出我做过这些事情的任何明显的错误?看着我的代码,我不知道为什么它要花很长时间才能完成任务。
我已经试过合并说16个分区来减少s3中的文件数量,但这似乎使作业运行速度慢得多。通常会有64个分区没有任何合并。