2017-02-14 254 views
0

序列文件内容获取错误我试图保存一些数据序列文件,并再次读取相同的数据,并尝试打印它作为一个小练习。当我使用rdd.foreach(println)时,我能够看到数据,但当我执行rdd.collect()时发生错误。这两种情况究竟会发生什么。在第二种情况下,错误是“对象不可序列化”。我正在Spark REPL中运行这些命令。我的火花版本是1.5.1获取与rdd.collect()与rdd.foreach(println)没有得到错误的火花

序列文件保存: -

val data = sc.parallelize(List(("Hadoop", 1), ("Spark", 2), ("Kafka", 3))) 
data.saveAsSequenceFile("file:///home/user/outputfiles/seqFileDemo1") 

=================

序列读取文件: -

import org.apache.hadoop.io.{Text, IntWritable} 
val data1 = sc.sequenceFile("file:///home/user/outputfiles/seqFileDemo1", classOf[Text], classOf[IntWritable]) 
data1.foreach(println) 

输出:

("Hadoop", 1), 
("Spark", 2), 
("Kafka", 3) 




scala> data1.collect() 

输出:

17/02/14 16:56:33 INFO SparkContext: Starting job: collect at <console>:19 
17/02/14 16:56:33 INFO DAGScheduler: Got job 5 (collect at <console>:19) with 2 output partitions 
17/02/14 16:56:33 INFO DAGScheduler: Final stage: ResultStage 5(collect at <console>:19) 
17/02/14 16:56:33 INFO DAGScheduler: Parents of final stage: List() 
17/02/14 16:56:33 INFO DAGScheduler: Missing parents: List() 
17/02/14 16:56:33 INFO DAGScheduler: Submitting ResultStage 5 (file:///home/srini/Desktop/SeqFileData/ HadoopRDD[3] at sequenceFile at <console>:16), which has no missing parents 
17/02/14 16:56:33 INFO MemoryStore: ensureFreeSpace(2440) called with curMem=694040, maxMem=556038881 
17/02/14 16:56:33 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.4 KB, free 529.6 MB) 
17/02/14 16:56:33 INFO MemoryStore: ensureFreeSpace(1448) called with curMem=696480, maxMem=556038881 
17/02/14 16:56:33 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1448.0 B, free 529.6 MB) 
17/02/14 16:56:33 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:60094 (size: 1448.0 B, free: 530.2 MB) 
17/02/14 16:56:33 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:861 
17/02/14 16:56:33 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 5 (file:///home/srini/Desktop/SeqFileData/ HadoopRDD[3] at sequenceFile at <console>:16) 
17/02/14 16:56:33 INFO TaskSchedulerImpl: Adding task set 5.0 with 2 tasks 
17/02/14 16:56:33 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 10, localhost, PROCESS_LOCAL, 2156 bytes) 
17/02/14 16:56:33 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 11, localhost, PROCESS_LOCAL, 2156 bytes) 
17/02/14 16:56:33 INFO Executor: Running task 1.0 in stage 5.0 (TID 11) 
17/02/14 16:56:33 INFO Executor: Running task 0.0 in stage 5.0 (TID 10) 
17/02/14 16:56:33 INFO HadoopRDD: Input split: file:/home/srini/Desktop/SeqFileData/part-00000:0+104 
17/02/14 16:56:33 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 11) 
java.io.NotSerializableException: org.apache.hadoop.io.Text 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (Hadoop,1)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 1) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 
17/02/14 16:56:33 INFO HadoopRDD: Input split: file:/home/srini/Desktop/SeqFileData/part-00001:0+121 
17/02/14 16:56:33 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 10) 
java.io.NotSerializableException: org.apache.hadoop.io.Text 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.Text, value: Kafka) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (Kafka,3)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 2) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 
17/02/14 16:56:33 ERROR TaskSetManager: Task 1.0 in stage 5.0 (TID 11) had a not serializable result: org.apache.hadoop.io.Text 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (Hadoop,1)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 1); not retrying 
17/02/14 16:56:33 INFO TaskSchedulerImpl: Cancelling stage 5 
17/02/14 16:56:33 INFO TaskSchedulerImpl: Stage 5 was cancelled 
17/02/14 16:56:33 INFO DAGScheduler: ResultStage 5 (collect at <console>:19) failed in 0.022 s 
17/02/14 16:56:33 INFO DAGScheduler: Job 5 failed: collect at <console>:19, took 0.083701 s 
17/02/14 16:56:33 ERROR TaskSetManager: Task 0.0 in stage 5.0 (TID 10) had a not serializable result: org.apache.hadoop.io.Text 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.Text, value: Kafka) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (Kafka,3)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 2); not retrying 
17/02/14 16:56:33 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 5.0 (TID 11) had a not serializable result: org.apache.hadoop.io.Text 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (Hadoop,1)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 1) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:904) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26) 
    at $iwC$$iwC$$iwC.<init>(<console>:28) 
    at $iwC$$iwC.<init>(<console>:30) 
    at $iwC.<init>(<console>:32) 
    at <init>(<console>:34) 
    at .<init>(<console>:38) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

回答

3

问题是TextIntWritable是不可序列化的。当您致电collect()时,它会尝试序列化所有数据并将其发送回驱动程序。为了使这项工作,首先需要分别提取TextIntWritableStringInt可序列化:

val result = 
    sc 
    .sequenceFile[Text, IntWritable]("file:///home/user/outputfiles/seqFileDemo1") 
    .map(tup => (tup._1.toString, tup._2.get())) 
    .collect() 
+0

感谢@Yuval。它解决了我的问题。另外我想明白的一件事是** data1 ** rdd首先成功创建。但是当我试图读取rdd时,** data1.foreach(println)**显示as ** data1.collect()**给出错误的输出。有什么理由呢? –