2017-04-06 88 views
0

我与下面的模式的数据集:Spark Scala reduceByKey - 如何引用配置文件中指定的键?

dataset.printSchema() 
    |-- id: string (nullable = true) 
    |-- feature1: double (nullable = true) 
    |-- feature2: double (nullable = true) 
    |-- feature3: double (nullable = true) 
    |-- feature4: double (nullable = true) 

以我application.conf我已经定义键的子集,应使用reduceByKey转化:

keyInfo { 
    keysToBeTransformed = "feature1,feature2" 
} 

我可以这些密钥加载到我的主要对象:

val config : Config = ConfigFactory.load() 
val keys : Array[String] = config.getString("keyInfo.keysToBeTransformed").split(",") 

对于这些键,我需要计算数据集中每个id的均值并将结果收集到一个数组中。目前,我用下面的办法:

val meanFeature1 : Array[Double] = dataset.map(x => (x.id, x.feature1)).rdd 
    .mapValues{z => (z,1)} 
    .reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)} 
    .map(x => { 
     val temp = x._2 
     val total = temp._1 
     val count = temp._2 
     (x._1, total/count) 
    }).collect().sortBy(_._1).map(_._2), 

    val meanFeature2 : Array[Double] = dataset.map(x => (x.id, x.feature2)).rdd 
    .mapValues{z => (z,1)} 
    .reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)} 
    .map(x => { 
     val temp = x._2 
     val total = temp._1 
     val count = temp._2 
     (x._1, total/count) 
    }).collect().sortBy(_._1).map(_._2) 

上述方法的问题是,它不会对我的application.conf指定的键的引用(计算不动态改变在密钥再 - 在application.conf中指定)

我该如何做到这一点?

回答

1

我认为DataFrame API更适合这种情况,因为它更好地支持按名称动态访问列。和转换DatasetDataFrame很简单:

val averagesPerId: Array[Array[Double]] = dataset 
    .groupBy("id") // this also converts to DataFrame 
    .avg(keys: _*) // create average for each key - creates a "avg(featureX)" column for each featureX key 
    .sort("id") 
    .map(r => keys.map(col => r.getAs[Double](s"avg($col)"))) // map Rows into Array[Double], one for each ID 
    .collect() 

// transposing the result to create an array where each row relates to a single key, 
// and mapping each row to its key: 
val averagesPerKey: Map[String, Array[Double]] = keys.zip(averagesPerId.transpose(identity)).toMap 

// for example, if `feature1` was in `keys`: 
val meanFeature1 = averagesPerKey("feature1") 
+0

伟大的,谢谢!你只需要收集()一次,你的解决方案也更具性能。 – kanimbla

0

我在此期间想出了另一个类似的解决方案将是如下:

val meanFeatures : Array[Array[Double]] = keys.map(col => {dataset 
    .groupBy("id") 
    .agg(avg(col)) 
    .as[(String,Double)] 
    .sort("id") 
    .collect().map(_._2) 
}) 
val meanFeature1 : Array[Double] = meanFeatures(0) 
相关问题