Scala 2.10在这里使用Spark 1.6.2。我有一个类似的(但不是相同的)问题,但是,接受的答案不是SSCCE,并假设了一定数量的关于Spark的“前期知识”;因此我无法重现或理解它。 更重要的是,该问题也仅限于向现有数据框添加新列,而我需要为数据框中的所有现有行添加列以及值。将StringType列添加到现有的Spark DataFrame,然后应用默认值
所以我想一列添加到现有的星火据帧,然后为新列的所有行赋予初始(“默认”)值。
val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)
jsonDF.show()
当我跑,我得到这个下面的输出(通过.show()
):
+----+--------+
| x| y|
+----+--------+
|true|not true|
+----+--------+
现在我想一个新的字段添加到jsonDF
,它的创建,并在不修改json
串后,使得所得的DF应该是这样的:
+----+--------+----+
| x| y| z|
+----+--------+----+
|true|not true| red|
+----+--------+----+
含义,我想添加一个新的“z
” colu mn到DF,类型StringType
,然后默认所有行包含z
- 值为"red"
。
从其他的问题,我已拼凑以下伪代码放在一起:
val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)
//jsonDF.show()
val newDF = jsonDF.withColumn("z", jsonDF("col") + 1)
newDF.show()
但是当我运行它,我得到的是.withColumn(...)
方法的编译器错误:
org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y);
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
我也没有看到任何API方法可以让我设置"red"
作为默认值。任何想法,我要去哪里错误?