2017-08-30 84 views
0

我正在使用IntelliJ IDE在Microsoft Windows Platform上执行Spark Scala代码。Spark Dataframes已成功创建,但无法写入本地磁盘

我有四个约30000条记录的Spark Dataframes,我试图从每个Dataframes中取一列作为我的需求的一部分。

我用Spark SQL函数来做到这一点,并成功执行。当我执行DF.show()或DF.count()方法时,我能够在屏幕上看到结果,但是当我尝试将数据帧写入本地磁盘(Windows目录)时,作业正在中止,出现以下错误:

线程“main”中的异常org.apache.spark.SparkException:作业 中止。在 org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp(FileFormatWriter.scala:147) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $写$ 1.适用(FileFormatWriter.scala:121) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $写$ 1.适用(FileFormatWriter.scala:121) 在 组织.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $ .WRITE(FileFormatWriter.scala:121) 在 组织.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRela tionCommand.scala:101) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute(commands.scala:58) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec。 sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply(SparkPlan.scala:114) at org.apache.spark。 sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply(SparkPlan.scala:135) 在 org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151) 在 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 在 组织。 apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute(QueryExecution.scala:87) at org.apache。 spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) 在 org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) 在 org.apache.spark。 sql.DataFrameWriter.save(DataFrameWriter.scala:215 ) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) 在main.src.countFeatures2 $ .countFeature $ 1(countFeatures2.scala:118) 在 main.src.countFeatures2 $。 getFeatureAsString $ 1(countFeatures2.scala:32) at main.src.countFeatures2 $ .main(countFeatures2.scala:40)at main.src.countFeatures2.main(countFeatures2.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0( Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl。的java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498)在 com.intellij.rt.execution.application .AppMain.main(AppMain.java:147) 引起:org.apache.spark.SparkException:作业由于阶段中止 失败:阶段31.0中的任务0失败1次,最近失败: 失去任务阶段0.0 31.0(TID 2636,localhost,executor driver): 命令字符串中的java.io.IOException:(null)条目:null chmod 0644 D:\ Test_Output_File2_temporary \ 0_temporary \ attempt_20170830194047_0031_m_000000_0 \ part-00000-85c32c55-e12d-4433-979d -ccecb2fcd341.csv at org.apache.h adoop.util.Shell $ ShellCommandExecutor.execute(Shell.java:770) 在org.apache.hadoop.util.Shell.execCommand(Shell.java:866)在 org.apache.hadoop.util.Shell.execCommand( Shell.java:849)在 org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) 在 org.apache.hadoop.fs.RawLocalFileSystem $ LocalFSFileOutputStream。(RawLocalFileSystem.java:225) 在 org.apache.hadoop.fs.RawLocalFileSystem $ LocalFSFileOutputStream。(RawLocalFileSystem.java:209) 在 org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307) 在 org.apache.hadoop。 fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296) 在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328) 在 org.apache.hadoop.fs.ChecksumFileSystem $ ChecksumFSOutputSummer。(ChecksumFileSystem.java:398) 在 org.apache。 hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) at org.apache.hadoop.fs.FileSystem.create(FileSystem .java:911)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)at org.apache .hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat的.java:132) 在 org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter(CSVRelation.scala:208) 在 org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory。的newInstance(CSVRelation.scala:178) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $ SingleDirectoryWriteTask(FileFormatWriter.scala:234)。 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $ .org $ apache $ spark $ sql $ execution $ datasources $ FileFormatWriter $$ executeTask(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1 $$ anonfun $ 3.apply(FileFormatWriter.scala:129) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1 $$ anonfun $ 3.apply(FileFormatWriter.scala:128) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala :87) 在org.apache.spark.scheduler.Task.run(Task.scala:99)在 org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:282) 在 的java。 util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745 )

驱动程序堆栈跟踪:在 org.apache.spark.scheduler。DAGScheduler.org $阿帕奇$火花$ $调度$$ DAGScheduler failJobAndIndependentStages(DAGScheduler.scala:1435) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1423) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1422) 在 scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59) 在斯卡拉。 collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ $ handleTaskSetFailed 1.适用(DAGScheduler.scala:802) 在scala.Option.foreach(Option.scala:257)在 org.apache.spark.scheduler .DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala :1605) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 在org.apache.spark.util.EventLoop $$匿名$ 1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)at org.apache.spark.SparkContext.runJob(SparkContext .scala:1931)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)在 org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $写$ 1.适用$ MCV $ SP (FileFormatWriter.scala:127) ... 28更多原因:java.io.IOException:(空)条命令 string:null chmod 0644 D:\ Test_Output_File2_temporary \ 0_temporary \ attempt_20170830194047_0031_m_000000_0 \ part-00000-85c32c55- e12d-4433-979d-ccecb2fcd341.csv at org.apache.hadoop.util.Shell $ ShellCommandExecutor.execute(Shell.java:770) at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)at org.apache.hadoop.fs。 RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) 在 org.apache.hadoop.fs.RawLocalFileSystem $ LocalFSFileOutputStream。(RawLocalFileSystem.java:225) 在 org.apache.hadoop.fs.RawLocalFileSystem $ LocalFSFileOutputStream(RawLocalFileSystem。的java:209) 在 org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307) 在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296) 在 有机.apache。 hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328) 在 org.apache.hadoop.fs.ChecksumFileSystem $ ChecksumFSOutputSummer。(ChecksumFileSystem.java:398) 在 org.apache.hadoop.fs.ChecksumFileSystem.create (ChecksumFileSystem.java:461) 在 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) 在org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)在 org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)at org.apache.hadoop.mapreduce.lib。 output.TextOutputFormat.getRecordWriter(TextOutputFormat。的java:132) 在 org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter(CSVRelation.scala:208) 在 org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance (CSVRelation.scala:178) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $ SingleDirectoryWriteTask(FileFormatWriter.scala:234)。 在 org.apache.spark.sql.execution.datasources.FileFormatWriter $ .org $ apache $ spark $ sql $ execution $ datasources $ FileFormatWriter $$ executeTask(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1 $$ anonfun $ 3 .apply(FileFormatWriter.scala:129) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1 $$ anonfun $ 3.apply(FileFormatWriter.scala:128) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala :87) 在org.apache.spark.scheduler.Task.run(Task.scala:99)在 org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:282) 在 的java。 util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745 )拿起_JAVA_OPTIONS: -Xmx512M

过程完成退出码1

我不能明白哪里出了问题。任何人都可以解释如何解决这个问题?

UPDATE 请注意,我能够在昨天之前写入相同的文件,并且没有在我的系统或IDE配置中进行更改。所以,我不明白为什么它运行直到昨天为什么不现在是运行

有一个类似的帖子在这个链接:(null) entry in command string exception in saveAsTextFile() on Pyspark,但他们使用的pyspark木星上的笔记本电脑,而我的问题是与IDE的IntelliJ

输出文件写入到本地磁盘

val Test_Output =spark.sql("select A.Col1, A.Col2, B.Col2, C.Col2, D.Col2 from A, B, C, D where A.primaryKey = B.primaryKey and B.primaryKey = C.primaryKey and C.primaryKey = D.primaryKey and D.primaryKey = A.primaryKey") 

val Test_Output_File = Test_Output.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("nullValue", "0").save("D:/Test_Output_File") 
+0

你可以发表你的写作代码吗? –

+0

Pyspark上的saveAsTextFile()中的命令字符串异常中的[(null)条目可能重复](https://stackoverflow.com/questions/40764807/null-entry-in-command-string-exception-in-saveastextfile-on -pyspark) –

+0

检查[this](https://stackoverflow.com/a/40958969/647053)它应该工作。下面的另一个答案提出了同样的解决方案 –

回答

0

最后我纠正了自己。我在创建数据框时使用了.persist()方法。这帮助我写入输出文件没有任何错误。虽然我不明白它背后的逻辑。

欣赏你的宝贵意见

1

超级简单的代码似乎有关文件系统:java.io.IOException: (null) entry in command string: null chmod 0644

既然你在windows上运行,你有没有把你的HADOOP_HOME设置为winutils.exe文件夹?

+0

yes。这就是为什么它成功运行到昨天。我以后没有对系统或IDE进行任何更改。但现在它不写这些文件。 – JKC

+0

这可能听起来很愚蠢..但是你有另一个火花同时运行吗?有时候,当我还有另一个单元测试仍在运行时,我还会遇到一些奇怪的行为,这些测试还未被正确关闭。瞄准'pkill java';) –

+0

没有@pskill java。我没有其他的例子 – JKC