2016-04-03 144 views
3

我有List[Double],如何将其转换为org.apache.spark.sql.Column。我试图将其作为列使用.withColumn()插入到现有的DataFrame中。如何将列表[Double]转换为列?

+0

'List [Double]'中的Double元素是什么? –

+0

@JacekLaskowski,它只是一个数字列表(双数据类型),我想添加为现有数据框中的列。 – vdep

+0

@vdep什么是标题编辑?我不明白。 – eliasah

回答

8

它不能直接完成。 Column不是数据结构,而是特定SQL表达式的表示。它不受特定数据的约束。你必须先转换你的数据。接近这一点的一种方式是parallelizejoin通过索引:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, DoubleType} 

val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y") 
val aList = List(1.0, -1.0, 0.0) 

val rows = df.rdd.zipWithIndex.map(_.swap) 
    .join(sc.parallelize(aList).zipWithIndex.map(_.swap)) 
    .values 
    .map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) } 

sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false)) 

另一个类似的方法是指标和使用,UDF来处理余下的:

import scala.util.Try 

val indexedDf = sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map { 
    case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i) 
    }, 
    df.schema.add("idx_", "long") 
) 

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption) 

indexedDf.withColumn("z", addValue(aList.toVector)($"idx_")) 

不幸的是这两种解决方案会从问题的影响。首先通过驱动程序传递本地数据会在您的程序中引入严重的瓶颈。通常数据应该直接从执行者那里访问。另一个问题是如果你想迭代执行这个操作,就会增加RDD谱系。

虽然第二个问题可以通过检查点来解决,但第一个问题通常会使这个想法毫无用处。我强烈建议你首先构建完整的结构,然后在Spark上读取它,或者以可以利用Spark体系结构的方式重新构建管道。例如,如果数据来自外部源,则使用map/mapPartitions直接对每个数据块执行读取操作。