我正在使用下面的模式使用Spark的数据源API创建DataFrame。Spark 2.1.1使用select()方法时,DataFrame给出错误的列
StructType(Seq(StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("livesIn", StringType, true),
StructField("bornIn", StringType, true)))
我硬编码的数据作为PrunedFilteredScan的buildScan如下所示()方法:
val schemaFields = schema.fields
// hardcoded for now. Need to read from Accumulo and plug it here
val rec = List("KBN 1000000 Universe Parangipettai", "Sreedhar 38 Mysore Adoni", "Siva 8 Hyderabad Hyderabad",
"Rishi 23 Blr Hyd", "Ram 45 Chn Hyd", "Abey 12 Del Hyd")
// Reading from Accumulo done. Constructing the RDD now for DF.
val rdd = sqlContext.sparkContext.parallelize(rec)
rdd.count
val rows = rdd.map(rec => {
//println("file ===============>"+file)
val fields = rec.split(" ")
val typeCastedValues = fields.zipWithIndex.map{
case (value, index) => {
//println(s"PRUNED val: ${value} - index: ${index}")
val dataType = schemaFields(index).dataType
typeCast(value, dataType)
}
}
Row.fromSeq(typeCastedValues)
})
rows }
private def typeCast(value: String, toType: DataType) = toType match {
case _: StringType => value
case _: IntegerType => value.toInt }
当我创建数据框,如下图所示:
val dfPruned = sqlContext.read.format(dsPackage).load().select("livesIn")
dfPruned.show
dfPruned.printSchema
它给了我name
列的数据头文件为livesIn
。如果我丢失任何东西,或请帮助这是Spark的错误2.1.1 Ouput
+--------+
| livesIn|
+--------+
| KBN|
|Sreedhar|
| Siva|
| Rishi|
| Ram|
| Abey|
+--------+
root
|-- livesIn: string (nullable = true)
谢谢Ramesh。但是,我需要实现这个扩展Spark的数据源API,但不使用createDataFrame()方法。 –
我已经更新了我的答案。 :)我希望我知道它是正确的。 –