2

我有一个2列火花数据帧 - col1col2SparkFrameFrame为什么会创建错误的分区数?

scala> val df = List((1, "a")).toDF("col1", "col2") 
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string] 

当我parquet格式写在磁盘上df,谱写等于唯一值的数量的文件数中的数据全部col1我做了repartition使用col1,像这样:

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file") 

上面的代码在文件系统中只产生一个文件。但是,洗牌操作的数量成为200

enter image description here

我无法理解这里的一件事,如果col1仅包含一个值,即1那么为什么它在repartition创造200个分区?

回答

4

repartition(columnName)每默认控制创建200个分区(更具体的,spark.sql.shuffle.partitions分区),不管有多少唯一值有col1。如果只有1个唯一值col1,那么分区中的199个是空的。另一方面,如果您有超过200个唯一值col1,则每个分区将有多个值col1

如果你只想要1分区,那么你可以做repartition(1,col("col1"))或只是coalesce(1)。但不是说​​3210不相同的行为在某种意义上说,​​3210我将进一步上升在你的代码suscht,你可能会失去并行(见How to prevent Spark optimization

如果要检查你的分区的内容,我已经为此做了2种方法:

// calculates record count per partition 
def inspectPartitions(df: DataFrame) = { 
    import df.sqlContext.implicits._ 
    df.rdd.mapPartitions(partIt => { 
     Iterator(partIt.toSeq.size) 
    } 
    ).toDF("record_count") 
} 

// inspects how a given key is distributed accross the partition of a dataframe 
def inspectPartitions(df: DataFrame, key: String) = { 
    import df.sqlContext.implicits._ 
    df.rdd.mapPartitions(partIt => { 
     val part = partIt.toSet 
     val partSize = part.size 
     val partKeys = part.map(r => r.getAs[Any](key).toString.trim) 
     val partKeyStr = partKeys.mkString(", ") 
     val partKeyCount = partKeys.size 
     Iterator((partKeys.toArray,partSize)) 
    } 
    ).toDF("partitions","record_count") 
} 

现在你可以例如检查你的数据帧是这样的:

inspectPartitions(df.repartition(col("col1"),"col1") 
.where($"record_count">0) 
.show 
0

在Spark SQL洗牌世界,洗牌分区的默认数量为200,这是由spark.sql.shuffle.partitions

相关问题