3

我有一个(相当大,觉得10E7行)数据帧从我基于某些属性过滤元件分区位置

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

我的数据帧有N个分区data.rdd.getNumPartitions

现在我想知道我的行来自哪个分区。我知道我可以通过所有分区重复像这样的东西

val temp = res.first() //or foreach, this is just an example 
data.foreachPartition(f => { 
    f.exists(row => row.get(0)==temp.get(0)) 
    //my code here 
}) //compare PKs 

data.rdd.mapPartitionsWithIndex((idx, f) => ...)

然而,这似乎过多,也不是很我的结果高性能和我的数据框变大。

在执行filter()操作后有Spark方法吗?

或者,有没有一种方法来重写/一个替代filter() - 语句,以便它返回行的原点?

我也可以保存分区位置,我的数据帧和更新上重新分区,但我宁愿做一个火花方式

(唯一类似的问题,我发现了here,既不问题我也发现this这可能是相似的,但不一样)

在此先感谢您的任何帮助/指针,我很抱歉,如果我错过了类似于我的问题已被回答。

+0

mapPartitionsWithIndex是一个简单的地图操作。它不涉及洗牌,只是分布式映射。可能有另一种方式,但我不确定它可能比这更真实。 – Marie

回答

0

分区号/计数不稳定,因为Spark将执行分区减少的自动扩展&。这意味着输入分区计数可能与输入文件计数不同。

一般模式在这些情况下是创建基于在每个输入文件中的数据的一些型复合关键的。如果密钥很大,可以对其进行散列以减小大小。如果您不太在乎碰撞,请使用Murmur3。如果您担心碰撞,请使用MD5,这仍然很快。

如果只有独特的功能,你必须是输入文件的路径,你就必须添加的文件路径作为区分列。这里是一个办法做到这一点:

val paths = Seq(...) 
val df = paths 
    .map { path => 
    sqlContext.read.parquet(path) 
     .withColumn("path", lit(path)) 
    } 
    .reduceLeft(_ unionAll _) 

的想法很简单:读取输入文件一次一个,加上与之相关的唯一列,然后使用UNION ALL它们结合在一起。