2017-07-03 62 views
1

我正在使用下面的模式使用Spark的数据源API创建DataFrame。Spark 2.1.1使用select()方法时,DataFrame给出错误的列

StructType(Seq(StructField("name", StringType, true), 
         StructField("age", IntegerType, true), 
         StructField("livesIn", StringType, true), 
         StructField("bornIn", StringType, true))) 

我硬编码的数据作为PrunedFilteredScan的buildScan如下所示()方法:

val schemaFields = schema.fields 
// hardcoded for now. Need to read from Accumulo and plug it here 
val rec = List("KBN 1000000 Universe Parangipettai", "Sreedhar 38 Mysore Adoni", "Siva 8 Hyderabad Hyderabad", 
       "Rishi 23 Blr Hyd", "Ram 45 Chn Hyd", "Abey 12 Del Hyd") 

// Reading from Accumulo done. Constructing the RDD now for DF. 
val rdd = sqlContext.sparkContext.parallelize(rec)   
rdd.count 
val rows = rdd.map(rec => { 
    //println("file ===============>"+file) 
    val fields = rec.split(" ") 

    val typeCastedValues = fields.zipWithIndex.map{ 
    case (value, index) => { 
     //println(s"PRUNED val: ${value} - index: ${index}") 

     val dataType = schemaFields(index).dataType 
     typeCast(value, dataType) 
    } 
    } 
    Row.fromSeq(typeCastedValues) 
}) 
rows } 
private def typeCast(value: String, toType: DataType) = toType match { 
case _: StringType  => value 
case _: IntegerType  => value.toInt } 

当我创建数据框,如下图所示:

val dfPruned = sqlContext.read.format(dsPackage).load().select("livesIn") 
dfPruned.show 
dfPruned.printSchema 

它给了我name列的数据头文件为livesIn。如果我丢失任何东西,或请帮助这是Spark的错误2.1.1 Ouput

+--------+ 
| livesIn| 
+--------+ 
|  KBN| 
|Sreedhar| 
| Siva| 
| Rishi| 
|  Ram| 
| Abey| 
+--------+ 

root 
|-- livesIn: string (nullable = true) 

回答

0

您应该创建dataframe当你有schema,当你已经将您rdd作为Rows作为

sqlContext.createDataFrame(rows, schema) 

然后,当你做

val dfPruned = sqlContext.createDataFrame(rows, schema).select("livesIn") 
dfPruned.show 
dfPruned.printSchema 

英语新HOULD越来越输出

+---------+ 
| livesIn| 
+---------+ 
| Universe| 
| Mysore| 
|Hyderabad| 
|  Blr| 
|  Chn| 
|  Del| 
+---------+ 

root 
|-- livesIn: string (nullable = true) 

编辑

如果你想使用数据源API,那么它更简单

sqlContext.read.format("csv").option("delimiter", " ").schema(schema).load("path to your file ").select("livesIn") 

应该做的伎俩。

注意:我使用的输入文件如下

KBN 1000000 Universe Parangipettai 
Sreedhar 38 Mysore Adoni 
Siva 8 Hyderabad Hyderabad 
Rishi 23 Blr Hyd 
Ram 45 Chn Hyd 
Abey 12 Del Hyd 
+0

谢谢Ramesh。但是,我需要实现这个扩展Spark的数据源API,但不使用createDataFrame()方法。 –

+0

我已经更新了我的答案。 :)我希望我知道它是正确的。 –

0

如果你想申请您的RDD的模式可以参考以下使用createDataFrame功能。

// create a row from your data by splitting wit " " 
    val rows = rdd.map(value => { 
     val data = value.split(" ") 
    // you could use Rows.fromSeq(data) but since you need second field as int needs conversion 

     Row(data(0), data(1).toInt, data(2), data(3)) 
    }) 

    //creating a dataframe with rows and schema 
    val df = sparkContext.createDataFrame(rows, schema) 


    // selecting only column livesIn 
    df.select("livesIn") 

输出:

+---------+ 
| livesIn| 
+---------+ 
| Universe| 
| Mysore| 
|Hyderabad| 
|  Blr| 
|  Chn| 
|  Del| 
+---------+ 

希望这是有帮助的!

+0

谢谢Shankar。但是,我需要实现这个扩展Spark的数据源API,但不使用createDataFrame()方法。 –