2016-11-15 62 views
0

我想在sparkSQLexamample runProgramaticSchemaExample上走得更远,并且无法处理动态列数。请参阅此代码,其中唯一更改是在for循环中指定Row的列映射。Spark:Programatic模式动态列映射

private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { 
    import spark.implicits._ 
    // $example on:programmatic_schema$ 
    // Create an RDD 
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") 

    // The schema is encoded in a string 
    val schemaString = "name age" 

    // Generate the schema based on the string of schema 
    val fields = schemaString.split(" ") 
     .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
    val schema = StructType(fields) 

    // Convert records of the RDD (people) to Rows 
    val rowRDD = peopleRDD 
     .map(_.split(",")) 
    //  .map(attributes => Row(attributes(0), attributes(1).trim)) 
     .map(attributes => Row(for (i <- 0 to (attributes.length -1)){attributes(i)})) 

    // Apply the schema to the RDD 
    val peopleDF = spark.createDataFrame(rowRDD, schema) 

    // Creates a temporary view using the DataFrame 
    peopleDF.createOrReplaceTempView("people") 
    peopleDF.printSchema() 
    // SQL can be run over a temporary view created using DataFrames 
    val results = spark.sql("SELECT name FROM people") 

    // The results of SQL queries are DataFrames and support all the normal RDD operations 
    // The columns of a row in the result can be accessed by field index or by field name 
    results.map(attributes => "Name: " + attributes(0)).show() 
    // +-------------+ 
    // |  value| 
    // +-------------+ 
    // |Name: Michael| 
    // | Name: Andy| 
    // | Name: Justin| 
    // +-------------+ 
    // $example off:programmatic_schema$ 
    } 
} 

以下是错误当我得到

16/11/15 09:31:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.runtime.BoxedUnit is not a valid external type for schema of string 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 

回答

0

我有完全相同的问题,因为你并试图解决它以同样的方式开始产生同样的错误:)我敢新的scala,但设法想出一个函数来生成该行对象(只需要传递你的字段计数)。

获取场数:

val fieldCount = rdd.map(_.split("\u0001")).map(x => x.size).first() 

生成行对象功能:

def getARow(x : Array[String], size : Int) : Row={ 
val columnArray = new Array[String](size+1) 
for (i <- 0 to (size)) { 
    columnArray(i)=x(i).toString() 
} 
Row.fromSeq(columnArray) 
} 

使用RDD和模式

val myDF = sqlContext.createDataFrame(rdd.map(_.split(delim)).map { x => getARow(x,fieldCount) }, mySchema) 

希望创建您的数据帧这可以帮助别人!

1

丹尼尔·的答案是好的,但我觉得有点问题,我修改了它和它的作品:

val fieldCount = schemaString.split(" ").length 
def getARow(x : Array[String], size : Int) : Row={ 
    val columnArray = new Array[String](size) 
    for (i <- 0 to (size-1)) { 
    columnArray(i)=x(i).toString() 
    } 
    Row.fromSeq(columnArray) 
} 

val fields = schemaString.split(" ") 
    .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields) 
val rowRDD = peopleRDD 
    .map(_.split(",")) 
    .map(attributes => getARow(attributes,fieldCount)) 
val peopleDF = spark.createDataFrame(rowRDD, schema)