2017-10-09 65 views
1

嗨我想添加新列使用DataFrame的每一行中的现有列,我试图在Spark Scala这样... df是包含可变数量的列,这些只能在运​​行时决定。使用Spark Scala添加新列使用现有的列

// Added new column "docid" 
val df_new = appContext.sparkSession.sqlContext.createDataFrame(df.rdd, df.schema.add("docid", DataTypes.StringType)) 

df_new.map(x => { 
     import appContext.sparkSession.implicits._ 
     val allVals = (0 to x.size).map(x.get(_)).toSeq 
     val values = allVals ++ allVals.mkString("_") 
     Row.fromSeq(values) 
    }) 

但这给错误是Eclipse本身

  • 无法找到存储在数据集型编码器。通过导入spark.implicits._支持原始类型(Int,String等)和Product类型(case类)。将来的发行版中将添加对序列化其他类型的支持。 (隐式证据$ 7:org.apache.spark.sql.Encoder [org.apache.spark.sql.Row])org.apache.spark.sql.Dataset [org.apache。 spark.sql.Row。未指定的值参数证明$ 7。

请帮忙。

+0

的'import'应'map'之外做来完成。 – Shaido

+1

你能举出输入数据和期望输出的例子吗?这应该有可能以更有效的方式解决。 – Shaido

回答

0

它可以更多更好地使用UDF和withColumn阿比

相关问题