2016-08-15 86 views
4

我在本地模式下调用Pyspark与火花2.0以下命令:java.lang.OutOfMemoryError:无法获得100个字节的存储器,得到0

pyspark --executor-memory 4g --driver-memory 4g 

输入数据帧正在被从一个读tsv文件并具有580 K x 28列。我在数据框上做了一些操作,然后我试图将它导出到tsv文件,我得到这个错误。

df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t') 

任何指针如何摆脱这个错误。我可以轻松地显示df或计算行数。

输出数据帧是3100行23列

错误:

Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0 
    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129) 
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374) 
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) 
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.fetchNextRow(WindowExec.scala:300) 
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.<init>(WindowExec.scala:309) 
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:289) 
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:288) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96) 
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) 
    ... 8 more 

Driver stacktrace: 
+1

你没有'coalesce()'试过吗?显然你的内存不足。你的配置是什么? – gsamaras

+0

我试过没有聚结,它运行良好。我的配置是英特尔I - 7与16 GB的RAM和Windows 7专业。我以前用0.5M行和15-20列使用相同的方法导出了其他文件,并且它工作正常。 –

回答

3

对我来说,问题确实coalesce() 。 我所做的是导出文件不使用coalesce(),而是使用df.write.parquet("testP")实木复合地板。然后读回文件并用coalesce(1)导出。

希望它也适用于你。

6

我相信这个问题的原因是coalesce(),其中尽管它避免了全面洗牌(如repartition would do事实),它必须缩小请求数量的分区中的数据。

在这里,您正在请求所有数据适合一个分区,因此一个任务(并且只有一个任务)必须与所有数据一起使用,这可能导致其容器受限于内存限制。

因此,要么请求多于1的分区,要么在这种情况下避免使用coalesce()


否则,你可以尝试下面的链接提供的解决方案,提高您的内存配置:

  1. Spark java.lang.OutOfMemoryError: Java heap space
  2. Spark runs out of memory when grouping by key
0

在我的情况下,司机比工人小。通过增大驱动程序来解决问题。