我目前正致力于将主要在SQL存储过程中编写的古老系统转移到Scala以在Spark上运行。存储过程是批量作业,每天/每周/每年一次,在“请求”对象上运行,可能需要几小时才能运行。SQL存储过程到Scala/Spark流
由于几个原因,我们正在将系统更改为流模型(Spark Streaming)。
在旧的系统中,很多逻辑是用连接语句执行的,其中大量的请求连接了许多表。
一种解决方案是基本上采用相同的SQl代码并将其移植到Spark SQL语句中,然后该语句将在请求的“微批”上运行。但是,这意味着我们仍然在执行大量的联接语句,我听说在Spark SQL中效率低下。
我有第二个想法是把业务逻辑,并编写代码,如果我们只需要过程单一请求(也就是说,如果你有10个应用程序,而不是处理所有这些带有连接,你会编程,就好像您在处理单个请求一样)。然后,我将采取微批量的请求并通过逻辑处理(即Requests.map(r => RequestLogic.execute(r)))映射它们。
类似下面的示例代码:
case class Request(id: Int, typeId: Int, value: Long)
def CreateStreamingContext(sparkConf: SparkConf, streamDuration: Duration,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): StreamingContext = {
sparkConf.set(SparkArgumentKeys.MaxCores, (partitionCount * 2).toString)
val ssc = new StreamingContext(sparkConf, streamDuration)
ssc.checkpoint(checkpointDir)
val stream = EventHubsUtils.createUnionStream(ssc, hubParams, storageLevel)
stream.checkpoint(streamDuration)
stream.map(x => Request(x(1), x(2), x(3)))
.map(r => RequestLogic.execute(r))
ssc
}
我想弄清楚:
1)哪一个会变得更好。
2)各有什么优点/缺点。
我是新来斯卡拉/ Spark和试图找出最好的方式。我不确定这是否足够的信息,如果需要,我会尝试并提供更多细节。