2016-09-27 81 views
4

我是相对较新的火花,我试图通过多个键同时分组数据。如何在Scala Spark作业中的多个键上使用ReduceByKey

我有一些数据,我映射所以它最终看起来像这样:

((K1,K2,K3),(V1,V2))

我的目标是小组由(K1 ,K2,K3),并分别总结V1和V2与落得:

((K1,K2,K3),(SUM(V1),SUM(V2))

下面是代码我有到目前为止:

val filepath = "file.avro" 
val sc = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sc)    
val data = sqlContext.read.avro(filepath) 
val dataRDD = data.rdd 

val mappedDataRDD = dataRDD.map{ 
    case (v, w, x, y, z) => ((v,w,x), (y, z)) 
}.reduceByKey((x,y)=> ???) 

所以我正在寻找如何reduceByKey,所以我可以按(v,w,x)键和y和z求和。

+0

'???'='(x._1 + y._1,x._2 + y._2)' – Alec

回答

4

我认为你在找什么和应该使用的是aggregateByKey

该方法需要两个参数组。第一个参数组取累加器的起始值。第二个参数组需要两个功能,

  1. 一个将事物累加到累加器中的函数。
  2. 结合了两个累加器的函数。

现在,您可以按如下方式使用它,

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD 
    .map({ 
    case (v, w, x, y, z) => ((v,w,x), (y, z)) 
    }) 
    .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) => (accY + y, accZ + z) } 
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) } 
) 

正如你应该已经观察到,第二个参数组中的功能实际上是在这种情况下相同。只有在type of the needed accumulationkey-value-RDDPairRDD中的值类型相同的情况下才有可能。

在这种情况下,你也可以使用reduceByKey,你能想到的作为aggregateByKey与既是函数参数传递相同的功能,

val mappedDataRDD = dataRDD 
    .map({ 
    case (v, w, x, y, z) => ((v,w,x), (y, z)) 
    }) 
    .reduceByKey(
    { case ((accY, accZ), (y, z)) => (accY + y, accZ + z) } 
) 

但在我看来,你should NOT使用reduceBykey。我建议使用aggregateByKey的原因是因为大数据集上的值的积累有时会产生超出您的类型范围的结果。

例如对于您的情况,我怀疑您的(x, y)实际上是(Int, Int),并且您想要使用(v, w, x)作为关键字来累计它。但是,当你大量添加Int ...请记住,结果可能会比Int可以处理的大。

所以...你会想要你的积累类型是更大范围的东西(Int, Int)(Long, Long)reduceByKey不允许你这样做。所以...我会说,也许你正在寻找和应该使用aggregateByKey

1

你也可以使用reduceByKey,你只需要小心你想要什么。我简化了这个例子,但它揭示了你想要的东西。

val rdd = sc.parallelize(List(
    (1, 2, 1, 1, 1), 
    (1, 2, 1, 2, 2), 
    (1, 3, 2, 4, 4))) 

rdd.map { 
    case (k1, k2, k3, v1, v2) => ((k1, k2, k3), (v1, v2)) 
}.reduceByKey { 
    // You receive two values which are actually tuples, so we treat them like that. 
    case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) 
}.collect() 
//res0: Array[((Int, Int), (Int, Int))] = Array(((1,2,1),(3,3)), ((1,3,2),(4,4))) 
+0

虽然这肯定工程按我的意见,你可能想键右边数( OP有'(k1,k2,k3)'。:) – Alec

+0

@Alec我认为这么多键是不必要的。然而,把它们放回来并不难。 –

相关问题