1
嗨我想添加新列使用DataFrame的每一行中的现有列,我试图在Spark Scala这样... df是包含可变数量的列,这些只能在运行时决定。使用Spark Scala添加新列使用现有的列
// Added new column "docid"
val df_new = appContext.sparkSession.sqlContext.createDataFrame(df.rdd, df.schema.add("docid", DataTypes.StringType))
df_new.map(x => {
import appContext.sparkSession.implicits._
val allVals = (0 to x.size).map(x.get(_)).toSeq
val values = allVals ++ allVals.mkString("_")
Row.fromSeq(values)
})
但这给错误是Eclipse本身
- 无法找到存储在数据集型编码器。通过导入spark.implicits._支持原始类型(Int,String等)和Product类型(case类)。将来的发行版中将添加对序列化其他类型的支持。 (隐式证据$ 7:org.apache.spark.sql.Encoder [org.apache.spark.sql.Row])org.apache.spark.sql.Dataset [org.apache。 spark.sql.Row。未指定的值参数证明$ 7。
请帮忙。
的'import'应'map'之外做来完成。 – Shaido
你能举出输入数据和期望输出的例子吗?这应该有可能以更有效的方式解决。 – Shaido