2015-10-14 67 views
0

我有我的火花数据框与下面的代码无法写入和应用GROUPBY火花数据帧

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
scala> import sqlContext.implicits._ 

scala> case class Wiki(project: String, title: String, count: Int, byte_size: String) 

scala> val data = sc.textFile("s3n://+++/").map(_.split(" ")).map(p => Wiki(p(0), p(1), p(2).trim.toInt, p(3))) 

scala> val df = data.toDF() 

,并尝试写入输出文件:

scala> df.write.parquet("df.parquet") 

或汇总数据与

scala> df.filter("project = 'en'").select("title","count").groupBy("title").sum().collect() 

失败,类似错误如下所示:

WARN TaskSetManager: Lost task 855.0 in stage 0.0 (TID 855, ip-172-31-10-195.ap-northeast-1.compute.internal): org.apache.spark.SparkException: Task failed while writing rows. 
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:88) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28) 
at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:28) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242) 
... 8 more 

我的数据帧的模式是类似以下

root 
---- 
-- project: String (true) 
-- title: String (true) 
-- count: Int (false) 
-- byte_size: String (true) 

我怎么能解释这个问题?我该如何解决它?

+0

您的hadoop群集是否正常工作? – Reactormonk

+0

@Reactormonk我这么认为。我在AWS EMR上发布了一个Spark集群。一切似乎都没问题。我在spark-shell中交互工作 –

+0

您可以添加您的Dataframe架构吗?在这里你有两种不同类型的错误! – eliasah

回答

0

确保您的分割总是返回4条记录的数组。也许你有一些格式不正确的条目,或者你用错误的字符分割它们。

尝试用过滤:

val data = sc.textFile("s3n://+++/").map(_.split(" ")).filter(_.size ==4)map(p => Wiki(p(0), p(1), p(2).trim.toInt, p(3))) 

,看看错误继续。拆分后的ArrayIndexOutOfBonds通常意味着某些记录被错误解析。在你的情况下,数字2可能意味着p(2)无法设置,这意味着其中一个记录只有2个值 - p(0)p(1)