1

我想运行一个相当简单的例子,其中涉及连接Spark到Cassandra并聚合数据。这个实现使用了spring-cassandra连接器,java,spring,其他的不多。Spark异常:java.io.NotSerializableException:org.apache.spark.streaming.api.java.JavaStreamingContext

这里是我通过Spring连线了星火配置文件

@Configuration 
@ComponentScan("test.spark.service") 
@Import({CassandraConfig.class}) 
public class SparkConfig { 

    @Autowired 
    private String cassandraUrl; 

    @Bean 
    public SparkConf sparkConf() { 
     SparkConf sparkConf = new SparkConf(); 

     // configure all the bells and whistles 
     sparkConf 
       .setMaster("spark://localhost:7077") 
       .setAppName("DataAggregator") 
       .set("spark.cassandra.connection.host", cassandraUrl); 

     return sparkConf; 
    } 

    @Bean 
    public JavaStreamingContext javaStreamingContext() { 
     return new JavaStreamingContext(sparkConf(), new Duration(1000)); 
    } 
} 

这里是不点距例外

@Service 
public class SparkServiceImpl implements SparkService, Serializable { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class); 

    @Autowired 
    JavaStreamingContext javaStreamingContext; 

    @Override 
    public void process() { 
     CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table"); 

    } 
} 

这似乎是工作的服务类,并返回一个CassandraJavaRDD

只要我改变实现以使用groupBy /函数,它就会与可序列化的异常一起出错

@Service 
public class SparkServiceImpl implements SparkService, Serializable { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class); 

    @Autowired 
    JavaStreamingContext javaStreamingContext; 

    @Override 
    public void process() { 
     CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table"); 

     JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() { 
      @Override 
      public Integer call(CassandraRow row) throws Exception { 
       return row.getInt("int_column"); 
      } 
     }); 
    } 
} 

这里是堆栈跟踪

org.apache.spark.SparkException: Task not serializable 

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664) 
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45) 
    at test.spark.service.SparkServiceImpl.process(SparkServiceServiceImpl.java:56) 
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) 
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) 
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) 
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) 
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262) 
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: [email protected]f) 
    - field (class: test.spark.service.SparkServiceImpl, name: javaStreamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext) 
    - object (class test.spark.service.SparkServiceImpl, [email protected]) 
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl) 
    - object (class test.spark.service.SparkServiceImpl$1, [email protected]) 
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) 
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
    ... 52 more 

除了这种例外,如果我的服务是不可序列也节距异常

这里是服务

@Service 
public class SparkServiceImpl implements SparkService { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class); 

    @Autowired 
    JavaStreamingContext javaStreamingContext; 

    @Override 
    public void process() { 
     CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table"); 

     JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() { 
      @Override 
      public Integer call(CassandraRow row) throws Exception { 
       return row.getInt("int_column"); 
      } 
     }); 
    } 
} 

这里是例外

org.apache.spark.SparkException: Task not serializable 

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665) 
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664) 
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45) 
    at test.spark.service.SparkServiceImpl.process(SparkServiceImpl.java:32) 
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) 
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) 
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) 
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) 
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262) 
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 
Caused by: java.io.NotSerializableException: test.spark.service.SparkServiceImpl 
Serialization stack: 
    - object not serializable (class: test.spark.service.SparkServiceImpl, value: [email protected]) 
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl) 
    - object (class test.spark.service.SparkServiceImpl$1, [email protected]) 
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) 
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
    ... 52 more 
+0

你是如何开始Spark工作的?看起来你似乎试图序列化你的'SparkServiceImpl'类,它包含'JavaStreamingContext'。 –

+0

看到这个' - object not serializable(class:test.spark.service.SparkServiceImpl,value:test。[email protected]) - 场(等级:test.spark.service.SparkServiceImpl $ 1,名称:此$ 0时,类型:类test.spark.service.SparkServiceImpl) - 对象(类test.spark.service。 SparkServiceImpl $ 1,[email protected]) - 场(类:org.apache.spark.api.java.JavaPairRDD $$ anonfun $ toScalaFunction $ 1,名称:好玩$ 1,类型:接口org.apache。 spark.api.java.function.Function) - 对象(类org.apache.spark.api.java.JavaPairRDD $$ anonfun $ to' –

+0

序列化调试器打印这些,所以你必须看看这些 –

回答

2

快速修复:

添加transient关键字JavaStreamingContext @BeanSparkServiceImpl

@Autowired 
private transient JavaStreamingContext javaStreamingContext; 

快速解释了原因:

这是因为JavaStreamingContext是在驱动程序创建和JavaStreamingContext是作为主要的必要条件Spark Streaming功能的入口点。

在您的SparkService实现 - SparkServiceImpl - 您对RDD有一些操作,master为已声明的转换创建任务。 经过这个阶段后,创建的任务被发送给工人,基本上这是任务最终执行的地方。

所以工人不需要SparkContext以及JavaStreamingContext - 正如你说,这是没有意义的序列化JavaStreamingContext

随着transient关键字你刚才说,你不想要序列JavaStreamingContext和火花作业执行这是正常的。

+0

@wojtek_z做了这项工作? –

相关问题