2017-06-01 138 views
1

像saveAsTextFile这样的简单函数将不起作用,我发现的解决方案 - 主要是关于相冲突的版本不适用于我。真的很感激任何帮助。saveAsTextFile在保存rdd时失败

messages2.foreachRDD(rdd -> { 

     long numHits = rdd.count(); 

     if(numHits == 0) 
      System.out.println("No new data fetched in last 30 sec"); 

     //Do Processing 
     else{ 
       System.out.println("Data fetched in the last 30 seconds: " + rdd.partitions().size() 
         + " partitions and " + numHits + " records"); 

       //Convert to java log object  
       JavaRDD<ApacheAccessLog> logs = rdd.map(x-> x._2) 
                .map(ApacheAccessLog::parseFromLogLine) 
                 .cache(); 

       //Find the bot ip addresses 
       JavaRDD<String> iprdd = logs.mapToPair(ip-> new Tuple2<>(ip.getIpAddress(),1)) 
         .reduceByKey(MyReducerFunc) 
          .filter(botip-> botip._2 > 50) 
           .keys(); 

       //If we find something, we store it in results dir on hdfs 
       if(iprdd.count() > 0) 
        iprdd.saveAsTextFile("/results/bad-ip-log"); 
      }   

    }); 

堆栈跟踪:

Exception in thread "streaming-job-executor-0" java.lang.IncompatibleClassChangeError: Implementing class 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1194) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468) 
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550) 
at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) 
at hadoopTest.hadoopTest.SparkConsumer.lambda$1(SparkConsumer.java:100) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at scala.util.Try$.apply(Try.scala:192) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) 

有趣的是,该作业不会失败,但我没有看到得到填充结果目录。 rdd似乎不是问题,因为使用foreach打印它的内容效果很好。

后,我改变了Hadoop的客户端版本匹配的Hadoop的其他东西(我使用2.6.0),这是我所得到的

ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 19) java.io.IOException: Mkdirs failed to create file:/results/bad-ip-log/_temporary/0/_temporary/attempt_20170601004858_0004_m_000001_19 (exists=false, cwd=file:/home/cloudera/workspace/hadoopTest) 
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:442) 
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428) 
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:801) 
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1206) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) 

编辑:更改日志级别到调试的对后。 RDD的指示在注释:

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1226) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468) 
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550) 
at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) 
at hadoopTest.hadoopTest.SparkConsumer.lambda$1(SparkConsumer.java:100) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
at scala.util.Try$.apply(Try.scala:192) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) 
+0

您能否为'org.apache.spark.rdd.PairRDDFunctions'启用DEBUG日志级别并重新开始。你的Spark版本是什么?你如何运行Spark应用程序? –

+0

为问题增加了额外的调试信息。我正在使用版本火花2.11版本2.1.1。我在Cloudera VM cdh5中将代码作为eclipse中的应用程序运行。它基本上是从kafka服务器传输信息,进行操作并保存到hfs。 –

回答

0

由于您使用Hadoop(HDFS)做如下修改

iprdd.saveAsTextFile("hdfs://<hostname>:<port>/results/"); 

希望这可以帮到!!! ....

+0

感谢您的回应,但现在我得到这个:“不完整的HDFS URI,没有主机:hdfs:///结果”。还试过hdfs:/// results/somefile仍然是同样的事情。 –

+1

获得'fs.default.name'的值并将'hostname'和'port'添加到您的URI中,使其看起来像'hdfs:// /results /' – philantrovert

+0

@ utkarsh-kajaria我修改了代码现在检查 – Bhavesh