2016-10-10 84 views
2

Scala 2.10在这里使用Spark 1.6.2。我有一个类似的(但不是相同的)问题,但是,接受的答案不是SSCCE,并假设了一定数量的关于Spark的“前期知识”;因此我无法重现或理解它。 更重要的是,该问题也仅限于向现有数据框添加新列,而我需要为数据框中的所有现有行添加列以及值。将StringType列添加到现有的Spark DataFrame,然后应用默认值


所以我想一列添加到现有的星火据帧,然后为新列的所有行赋予初始(“默认”)值。

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

jsonDF.show() 

当我跑,我得到这个下面的输出(通过.show()):

+----+--------+ 
| x|  y| 
+----+--------+ 
|true|not true| 
+----+--------+ 

现在我想一个新的字段添加到jsonDF,它的创建,并在不修改json串后,使得所得的DF应该是这样的:

+----+--------+----+ 
| x|  y| z| 
+----+--------+----+ 
|true|not true| red| 
+----+--------+----+ 

含义,我想添加一个新的“z” colu mn到DF,类型StringType,然后默认所有行包含z - 值为"red"

从其他的问题,我已拼凑以下伪代码放在一起:

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

//jsonDF.show() 

val newDF = jsonDF.withColumn("z", jsonDF("col") + 1) 

newDF.show() 

但是当我运行它,我得到的是.withColumn(...)方法的编译器错误:

org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y); 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 

我也没有看到任何API方法可以让我设置"red"作为默认值。任何想法,我要去哪里错误?

回答

8

您可以使用​​3210函数。首先,你必须将其导入

import org.apache.spark.sql.functions.lit 

,并用它作为显示在下面的列

jsonDF.withColumn("z", lit("red")) 

类型将被自动推断。