2016-11-16 75 views
0

我只有查询的时间运行火花2.0.1跑进问题:星火2.0.1:斯卡拉:临时视图查询失败

  1. 从原来的RDD
  2. 创建模式创建RDD行RDD
  3. 创建数据帧

我再次测试:

case class Person(name: String, age: Long) 

val peopleDF = sparkSession.sparkContext 
    .textFile("/home/raja/scala_code/text2.dat") 
    .map(_.split("|")) 
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF() 

peopleDF.createOrReplaceTempView("people") 

val teenagersDF = sparkSession.sql("SELECT * FROM people") 

teenagersDF.show() 

当我解雇sparkSession.sql("SELECT name FROM emp")时,它给出了以下错误。

{ java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 3 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) 
    ... 20 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 64 elided 
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 3 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) 
    ... 20 more} 
+0

请发布创建'emp'的代码 - Spark评估视图_lazily_,以便只在查询时引发异常并不意味着错误在查询中,它可能来自创建视图的代码。 –

+0

我在这里和另一个下面添加了两个部分val sparkSession = SparkSession.builder.master(“local”).appName(“example”)。getOrCreate() val sc = sparkSession.sparkContext import sparkSession.implicits._ (0),p(0),p(0),p(0),p(0),p (1)))val schemaString =“name birthplace” val schema = StructType(fields) – user3485352

+0

val(字段名)VALID = schemaString.split semprdd = emprdd.map(value => Row(value)) val empDF = sparkSession.createDataFrame(semprdd,schema) empDF.createOrReplaceTempView(“ emp“) val results = sparkSession.sql(”SELECT name FROM emp“)results.show() – user3485352

回答

1

首先,split("|")不通过像您期望的管分开,因为split(s: String)需要一个正则表达式作为输入,管正则表达式中一个特殊字符。查看更多详细信息和解决方案在这里:https://stackoverflow.com/a/40359414/5344058

如果修复那些(你的问题不提供取样输入数据,所以我不能肯定)后问题仍然存在,例外(java.lang.ArrayIndexOutOfBoundsException: 1)是相当指示 - 您的代码假设的split("|")对每条记录的结果产生一个阵列至少项目:

.map(_.split("|")) 
.map(attributes => Person(attributes(0), attributes(1).trim.toInt)) 
//            ^
//             | 
// this will throw exception if input isn't valid --/ 

如果任何记录不符合这个条件,你会看到这个异常。

为了避免这种情况,您可以采取几条路线。如果你只是想跳过无效行,你可以使用collect,而不是map与与至少两个项目只定义阵列部分功能:

.map(_.split("\\|")) 
.collect { case Array(a1, a2, _*) => Person(a1, a2.trim.toInt) } 

这个代码将只筛选出所有记录其中split产生少于两条记录的数组。

+0

那么,我建议的代码适用于简单的管道分隔数据(我使用' RAJA | 123 | Pynursla'),所以除非你提供一些失败的样本数据(通过再次编辑问题)我无法进一步帮助。 –

+0

是的,我改变了适当类型的类定义和映射也应该是完全键入和它的作品。看来,类型转换正在做很多事情:) – user3485352