2016-07-30 50 views
2

按字段将数据分区为预定义分区计数的最佳方式是什么?什么是按列分区但保持固定分区数的有效方式?

我目前通过指定partionCount = 600来分区数据。发现计数600可为我的数据集/群集设置提供最佳查询性能。

val rawJson = sqlContext.read.json(filename).coalesce(600) 
rawJson.write.parquet(filenameParquet) 

现在我想通过列“eventName的”分区此数据,但仍然保持计数600的数据,目前约有2000独特eventNames,加上各eventName的行数不统一。大约10个eventNames有超过50%的数据导致数据倾斜。因此,如果我像下面那样进行分区,那么它不是很高效。写入时间比没有写入多5倍。

val rawJson = sqlContext.read.json(filename) 
rawJson.write.partitionBy("eventName").parquet(filenameParquet) 

什么是这些方案的数据分区的好方法?有没有办法通过eventName进行分区,但将其分散到600个分区中?

我的模式是这样的:

{ 
    "eventName": "name1", 
    "time": "2016-06-20T11:57:19.4941368-04:00", 
    "data": { 
    "type": "EventData", 
    "dataDetails": { 
     "name": "detailed1", 
     "id": "1234", 
... 
... 
    } 
    } 
} 

谢谢!

回答

0

您是否尝试过应用列表存储概念。你有几个分区让你倾斜列,就像你提到的10个事件名称一样。其余的,你可以只有一个分区/目录来保存所有其他密钥。你可以看看here。其主要针对80-20规则。

0

这是倾斜数据的常见问题,您可以采取几种方法。

如果偏差在一段时间内保持稳定,列表分段可能会发挥作用,这可能会也可能不会发生,特别是如果引入了新的分区变量值。我还没有研究过随着时间的推移调整列表存储的容易程度,正如您的评论所述,您无法使用它,因为它是Spark 2.0的一项功能。

如果您使用的是1.6.x,关键的观察是您可以创建自己的函数,将每个事件名称映射为600个唯一值之一。您可以将其作为UDF或案例表达式来执行。然后,您只需使用该功能创建一个列,然后使用该列进行分区,使用repartition(600, 'myPartitionCol)而不是coalesce(600)

由于我们在Swoop处理了非常歪斜的数据,我发现以下主要的数据结构对于构建与分区相关的工具非常有用。

/** Given a key, returns a random number in the range [x, y) where 
    * x and y are the numbers in the tuple associated with a key. 
    */ 
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable { 
    private val r = new java.util.Random() // Scala Random is not serializable in 2.10 

    def apply(key: A): Int = { 
    val (start, end) = m(key) 
    start + r.nextInt(end - start) 
    } 

    override def toString = s"RandomRangeMap($r, $m)" 
} 

例如,这里是我们如何建立一个分区的情况会稍有不同:一个在数据偏斜和密钥的数量少,所以我们必须增加分区的数量为歪斜键,在1坚持为每个键分区的最小数目:

/** Partitions data such that each unique key ends in P(key) partitions. 
    * Must be instantiated with a sequence of unique keys and their Ps. 
    * Partition sizes can be highly-skewed by the data, which is where the 
    * multiples come in. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner { 
    private val rrm = new RandomRangeMap(
    keyMap.keys 
     .zip(
     keyMap.values 
      .scanLeft(0)(_+_) 
      .zip(keyMap.values) 
      .map { 
      case (start, count) => (start, start + count) 
      } 
    ) 
     .toMap 
) 

    override val numPartitions = 
    keyMap.values.sum 

    override def getPartition(key: Any): Int = 
    rrm(key) 
} 

object ByKeyPartitionerWithMultiples { 

    /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
    def udf(keyMap: Map[String, Int]) = { 
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]]) 
    (key:String) => partitioner.getPartition(key) 
    } 

} 

你的情况,你有几个事件名称合并为一个分区,这需要改变,但我希望上面的代码给你一个想法如何来解决这个问题。

最后一个观察结果是,如果随着时间的推移,事件名称的分布在您的数据中有很大的价值,您可以对数据的某些部分执行统计数据收集传递以计算映射表。你不必一直这样做,只是在需要的时候。要确定这一点,您可以查看每个分区中的输出文件的行数和/或大小。换句话说,整个过程可以作为Spark作业的一部分自动执行。

+0

感谢Sim的细节。 – vijay

+1

如果重新分区是通过计算列(eventName的映射)完成的,那么通过eventName(即WHERE eventName ==“foo”)筛选的查询仍然只能读取相关分区而不执行全表扫描,因为它现在不再是eventName分区了? – vijay

+0

只有在完全过滤分区列时,才会发生最有效的加载。如果您的偏差在一段时间内保持稳定,则使用静态映射(无论它可能是什么;不一定是列表桶),并在查询过程中应用相同的功能。如果您的偏差随时间推移不稳定,则需要随时间分别维护事件到分区映射的数据结构,在您正在查询的时间段内进行联合,并通过分区列对两者进行过滤(以有效减少分区)和事件名称(专注于分区)。 – Sim