2016-11-30 76 views
0

我已经在线查看了一些问题,但他们似乎并没有做我想做的事情。Spark:Transpose DataFrame Without Aggregating

我在Scala上使用Apache Spark 2.0.2。

我有一个数据帧:

+----------+-----+----+----+----+----+----+ 
|segment_id| val1|val2|val3|val4|val5|val6| 
+----------+-----+----+----+----+----+----+ 
|   1| 100| 0| 0| 0| 0| 0| 
|   2| 0| 50| 0| 0| 20| 0| 
|   3| 0| 0| 0| 0| 0| 0| 
|   4| 0| 0| 0| 0| 0| 0| 
+----------+-----+----+----+----+----+----+ 

,我要转置到

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val1| 100| 0| 0| 0| 
|val2| 0| 50| 0| 0| 
|val3| 0| 0| 0| 0| 
|val4| 0| 0| 0| 0| 
|val5| 0| 20| 0| 0| 
|val6| 0| 0| 0| 0| 
+----+-----+----+----+----+ 

我一直在使用pivot()尝试,但我无法得到正确的答案。我最终遍历了我的val{x}列,并按照以下方式对每个列进行了旋转,但事实证明这很慢。

val d = df.select('segment_id, 'val1) 

+----------+-----+ 
|segment_id| val1| 
+----------+-----+ 
|   1| 100| 
|   2| 0| 
|   3| 0| 
|   4| 0| 
+----------+-----+ 

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals') 

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val1| 100| 0| 0| 0| 
+----+-----+----+----+----+ 

然后对val{x}每次迭代给我的第一数据框中使用union()

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val2| 0| 50| 0| 0| 
+----+-----+----+----+----+ 

是否有转在这里我不想汇总数据,更有效的方法是什么?

谢谢:)

+0

我怎样才能做到这一点与数据框? –

+0

您是否期望不同的答案,或者您对现有答案满意? – 2016-12-11 02:34:59

回答

1

不幸的是,在任何情况下:

  • 星火DataFrame是有道理的考虑数据量。
  • 数据转置是可行的。

您必须记住,在Spark中实现的DataFrame是行的分布式集合,每行都在单个节点上存储和处理。

你可以表达对DataFramepivot置换:

val kv = explode(array(df.columns.tail.map { 
    c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*)) 

df 
    .withColumn("kv", kv) 
    .select($"segment_id", $"kv.k", $"kv.v") 
    .groupBy($"k") 
    .pivot("segment_id") 
    .agg(first($"v")) 
    .orderBy($"k") 
    .withColumnRenamed("k", "vals") 

,但它仅仅是没有实际应用的玩具代码。在实践中,它比收集数据不是更好:既你会给你想要的结果

val df = Seq(
    (1, 100, 0, 0, 0, 0, 0), 
    (2, 0, 50, 0, 0, 20, 0), 
    (3, 0, 0, 0, 0, 0, 0), 
    (4, 0, 0, 0, 0, 0, 0) 
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6") 

+----+---+---+---+---+ 
|vals| 1| 2| 3| 4| 
+----+---+---+---+---+ 
|val1|100| 0| 0| 0| 
|val2| 0| 50| 0| 0| 
|val3| 0| 0| 0| 0| 
|val4| 0| 0| 0| 0| 
|val5| 0| 20| 0| 0| 
|val6| 0| 0| 0| 0| 
+----+---+---+---+---+ 

话虽这么说,如果你

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match { 
    case Array(h, t @ _*) => { 
    (h.map(_.toString), t.map(_.collect { case x: Int => x })) 
    } 
} 

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) } 
val schema = StructType(
    StructField("vals", StringType) +: header.map(StructField(_, IntegerType)) 
) 

spark.createDataFrame(sc.parallelize(rows), schema) 

对于DataFrame定义为需要对分布式数据结构进行有效的转换,您必须在其他地方查找。有许多结构,包括核心CoordinateMatrixBlockMatrix,它们可以跨两个维度分布数据并且可以转置。