我与下面的模式的数据集:Spark Scala reduceByKey - 如何引用配置文件中指定的键?
dataset.printSchema()
|-- id: string (nullable = true)
|-- feature1: double (nullable = true)
|-- feature2: double (nullable = true)
|-- feature3: double (nullable = true)
|-- feature4: double (nullable = true)
以我application.conf我已经定义键的子集,应使用reduceByKey转化:
keyInfo {
keysToBeTransformed = "feature1,feature2"
}
我可以这些密钥加载到我的主要对象:
val config : Config = ConfigFactory.load()
val keys : Array[String] = config.getString("keyInfo.keysToBeTransformed").split(",")
对于这些键,我需要计算数据集中每个id的均值并将结果收集到一个数组中。目前,我用下面的办法:
val meanFeature1 : Array[Double] = dataset.map(x => (x.id, x.feature1)).rdd
.mapValues{z => (z,1)}
.reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)}
.map(x => {
val temp = x._2
val total = temp._1
val count = temp._2
(x._1, total/count)
}).collect().sortBy(_._1).map(_._2),
val meanFeature2 : Array[Double] = dataset.map(x => (x.id, x.feature2)).rdd
.mapValues{z => (z,1)}
.reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)}
.map(x => {
val temp = x._2
val total = temp._1
val count = temp._2
(x._1, total/count)
}).collect().sortBy(_._1).map(_._2)
上述方法的问题是,它不会对我的application.conf指定的键的引用(计算不动态改变在密钥再 - 在application.conf中指定)
我该如何做到这一点?
伟大的,谢谢!你只需要收集()一次,你的解决方案也更具性能。 – kanimbla