2016-09-25 167 views
0

我对Python很熟悉,我正在学习Spark-Scala。在Spark-Scala中,如何将数组列表复制到DataFrame中?

我想建立具有由这种语法desribed结构的数据帧:

// Prepare training data from a list of (label, features) tuples. 
val training = spark.createDataFrame(Seq(
    (1.1, Vectors.dense(1.1, 0.1)), 
    (0.2, Vectors.dense(1.0, -1.0)), 
    (3.0, Vectors.dense(1.3, 1.0)), 
    (1.0, Vectors.dense(1.2, -0.5)) 
)).toDF("label", "features") 

我从这个网址上面的语法: http://spark.apache.org/docs/latest/ml-pipeline.html

目前我的数据是数组,我已经退出出了DF的:

val my_a = gspc17_df.collect().map{row => Seq(row(2),Vectors.dense(row(3).asInstanceOf[Double],row(4).asInstanceOf[Double]))} 

我的阵列的结构非常类似于上述DF:

my_a: Array[Seq[Any]] = 
Array(
    List(-1.4830674013266898, [-0.004192832940431825,-0.003170667657263393]), 
    List(-0.05876766500768526, [-0.008462913654529357,-0.006880595828929472]), 
    List(1.0109273250546658, [-3.1816797620416693E-4,-0.006502619326182358])) 

如何将数据从我的数组复制到具有上述结构的DataFrame?

我想这句法:

val my_df = spark.createDataFrame(my_a).toDF("label","features") 

星火我吼道:

<console>:105: error: inferred type arguments [Seq[Any]] do not conform to method createDataFrame's type parameter bounds [A <: Product] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
         ^
<console>:105: error: type mismatch; 
found : scala.collection.mutable.WrappedArray[Seq[Any]] 
required: Seq[A] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
             ^
scala> 

回答

4

这里的第一个问题是,你使用List存储行数据。列表是同类数据结构,并且由于Anyrow(2))和DenseVector的唯一常见类型是AnyObject),所以最终的结果为Seq[Any]

下一个问题是你根本用row(2)。由于实际上是Any的一个集合,因此此操作不会返回任何有用的类型,并且不会将结果存储在DataFrame中,而不会提供明确的Encoder

从更加火花的角度来看,它也不是好方法。 collect - 只是为了转换数据,不应该要求任何评论和。映射到Rows只是为了创建Vectors也没有多大意义。

假设没有类型不匹配,你可以使用VectorAssembler

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array(df.columns(3), df.columns(4))) 
    .setOutputCol("features") 

assembler.transform(df).select(df.columns(2), "features") 

,或者如果你真的想手动的UDF处理这个问题。

val toVec = udf((x: Double, y: Double) => Vectors.dense(x, y)) 

df.select(col(df.columns(2)), toVec(col(df.columns(3)), col(df.columns(4)))) 

一般来说,我强烈建议在开始使用Spark之前熟悉Scala。

相关问题