2016-08-30 119 views
1

我目前正致力于将主要在SQL存储过程中编写的古老系统转移到Scala以在Spark上运行。存储过程是批量作业,每天/每周/每年一次,在“请求”对象上运行,可能需要几小时才能运行。SQL存储过程到Scala/Spark流

由于几个原因,我们正在将系统更改为流模型(Spark Streaming)。

在旧的系统中,很多逻辑是用连接语句执行的,其中大量的请求连接了许多表。

一种解决方案是基本上采用相同的SQl代码并将其移植到Spark SQL语句中,然后该语句将在请求的“微批”上运行。但是,这意味着我们仍然在执行大量的联接语句,我听说在Spark SQL中效率低下。

我有第二个想法是把业务逻辑,并编写代码,如果我们只需要过程单一请求(也就是说,如果你有10个应用程序,而不是处理所有这些带有连接,你会编程,就好像您在处理单个请求一样)。然后,我将采取微批量的请求并通过逻辑处理(即Requests.map(r => RequestLogic.execute(r)))映射它们。

类似下面的示例代码:

case class Request(id: Int, typeId: Int, value: Long) 

def CreateStreamingContext(sparkConf: SparkConf, streamDuration: Duration, 
          storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): StreamingContext = { 

    sparkConf.set(SparkArgumentKeys.MaxCores, (partitionCount * 2).toString) 
    val ssc = new StreamingContext(sparkConf, streamDuration) 
    ssc.checkpoint(checkpointDir) 

    val stream = EventHubsUtils.createUnionStream(ssc, hubParams, storageLevel) 
    stream.checkpoint(streamDuration) 

    stream.map(x => Request(x(1), x(2), x(3))) 
     .map(r => RequestLogic.execute(r)) 

    ssc 
} 

我想弄清楚:

1)哪一个会变得更好。
2)各有什么优点/缺点。

我是新来斯卡拉/ Spark和试图找出最好的方式。我不确定这是否足够的信息,如果需要,我会尝试并提供更多细节。

回答

0

有趣的问题,答案取决于你的数据的形状。我想假设两种情况:

  • 首先,你有很多Request的数据,你想加入他们的行列,以相对小的主数据量。

  • 二,Request数据和数据加入的数据量同样大,超过您的群集的RAM。

在第一种情况下,你可以指点火花(原则上也应该能够自动判断)使用一种叫做BroadcastHashJoin。策略是将小表广播到每个Spark工作人员,并将其与更大的RDD中的每个元素进行连接。将会有两个以上的非空分区,所以Spark可以在两个以上的工作节点上运行得更快。在合同ShuffleHashJoin将采取所有行,并随密钥洗牌。整个表只有2个非空分区,并且向作业中添加更多工作节点将无济于事。所以能够做到BroadcastHashJoin以最少的努力确保可扩展性。有关DataBricks链接的笔记本的更多详细信息。

在第二种情况下,您的策略并不是一个坏主意。我们有很好的预处理数据的经验,可以加入像RocksDBAWS DynamoDB这样的外部KV-Store,然后进行查找并以流媒体的方式加入。但是,即使在一个小群集上也可以将这个过程扩展到真正的海量数据集,但性能和工作量要比纯粹的Spark内存方法高得多。