2016-03-07 94 views
1

我在使用YARN的Hadoop集群上运行以下代码。 它分析一些电子邮件并执行情感注释,最后它将结果DataFrame写为HDFS上的Parquet表。不幸的是,它在HDFS上的最后一个数据帧写入行#66上保持失败,错误显示在底部。无论如何,我无法解释为什么每当我使用数据集的一个小样本时,它终止成功。DataFrame写入HDFS期间SparkException

object ETLDriver { 

    val appName = "ENRON-etl" 
    val conf = new SparkConf().setAppName(appName) 
    val sc = new SparkContext(conf) 

    def main (args: Array[String]): Unit = { 
     val allExtracted = sc.objectFile[(String, Seq[String])](Commons.ENRON_EXTRACTED_TXT) 
     // Testing on a sub-sample 
     // val allExtracted = sc.objectFile[(String, Seq[String])](Commons.ENRON_EXTRACTED_TXT).sample(false, 0.01, 42) 

     // get custodians from csv file stored in HDFS 
     val csv = sc.textFile(Commons.ENRON_CUSTODIANS_CSV_HDFS).map{line => line.split(",")} 
     var custodians = sc.broadcast(csv.map{record => Custodian(record(0),record(1),Option(record(2)))}.collect().toSeq) 

     // parse emails 
     val allParsed: RDD[MailBox] = allExtracted.map { case (mailbox, emails) => 
      val parsedEmails = emails flatMap { email => 
       try Some(EmailParser.parse(email, custodians.value)) 
       catch { 
        case e: EmailParsingException => None 
       } 
      } 
      MailBox(mailbox, parsedEmails) 
     } 

     // classify sentiment and save w/o body 
     val mailboxesSentiment = allParsed.map { mailbox => 
      // load sentiment annotator pipeline 
      val nlpProps = new Properties 
      nlpProps.setProperty("annotators", "tokenize, ssplit, pos, parse, lemma, sentiment") 
      val pipeline = new StanfordCoreNLP(nlpProps) 
      // annotation 
      val emailsWithSentiment = mailbox.emails.map { email => 
       val document = new Annotation(email.body) 
       pipeline.annotate(document) 
       val sentiment = document.get[String](classOf[SentimentCoreAnnotations.ClassName]) 
       EmailWithSentiment(email.date, email.from, email.to ++ email.cC++ email.bcc, email.subject, sentiment) 
      } 

      MailBoxWithSentiment(mailbox.name, emailsWithSentiment) 
     } 

     val sqlContext = new SQLContext(sc) 
     import sqlContext.implicits._ 

     val dfFull = allParsed.toDF() 
     dfFull.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_FULL_DATAFRAME) 
     val dfSentiment = mailboxesSentiment.toDF() 
     dfSentiment.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_SENTIMENT_DATAFRAME) 
    } 
} 

错误:

AM Container for appattempt_1456482988572_5307_000001 exited with exitCode: 15 
For more detailed output, check application tracking page:http://head05.custer_url:8088/cluster/app/application_1456482988572_5307Then, click on links to logs of each attempt. 
Diagnostics: Exception from container-launch. 
Container id: container_1456482988572_5307_01_000001 
Exit code: 15 
Stack trace: ExitCodeException exitCode=15: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) 
at org.apache.hadoop.util.Shell.run(Shell.java:487) 
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) 
at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
Shell output: main : command provided 1 
main : run as user is lsde03 
main : requested yarn user is lsde03 
Container exited with a non-zero exit code 15 
Failing this attempt 

再登录星火here

+0

你能注意到这行是第66,因为StackOverflow上没有行号。这个? 'dfSentiment.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_SENTIMENT_DATAFRAME)' – morganw09dev

+0

yes就是那一行 –

回答

0

行号往往是非常无益的。他们通常会指向触发工作的线路,而不是问题,所以通常是一些savecollect操作。

在您链接的日志中是ExecutorLostFailure。目前尚不清楚为什么我发现,通常当Spark没有给出理由时,内存是一个很好的开始。您广播的数据有多大?你可能想检查你的记忆设置。对于YARN上的Spark,这是一篇很好理解的文章。

http://www.wdong.org/wordpress/blog/2015/01/08/spark-on-yarn-where-have-all-my-memory-gone/

希望这有助于

+0

根据HDFS,加载到''''allExtracted'''的文件总大小是3.3G。感谢您的链接,不幸的是我没有访问Spark/Hadoop配置,我想我可能只需用'''spark-submit''发送一些参数。我尝试着用大量资源来执行,但仍然失败('''spark-submit --master yarn --deploy-mode cluster --num-executors 480 --executor-memory 12G --executor-cores 4 - 驱动核4 - 驱动内存6G [......]''')。无论如何,如果我评论情绪提取(标有'''<<<''''[这里](http://pastebin.com/9zsLgPpF)的行) –