2017-05-26 96 views
0

我有一个数据库对象,用于从所有Spark执行器插入数据。当我将此对象定义为static时,它在这些执行程序中具有null值。所以我在驱动程序中声明它,然后播放它,然后在每个执行者中获得它的价值。当运行应用程序时,下面的抛出异常:在Spark中如何定义要广播的对象Java

Exception in thread "main" java.io.NotSerializableException: database.Database 

注:

  • 执行人类是可序列化
  • 我除去广播对象是在该类别定义为瞬态瞬态但它没有工作
+0

数据库对象的意思是? DTO或其他东西? –

+0

请查看[如何创建最小,完整和可验证的示例](https://stackoverflow.com/help/mcve)并相应地重写您的问题。 –

+0

我创建了一个处理连接数据库和所有数据库交互的类。 –

回答

1

我以这种方式解释你的问题:

我想从所有Spark执行程序的RDD中插入数据。我试图在驱动程序上创建一个数据库连接,并以某种方式将它作为广播传递给执行者,但Spark一直在投掷NotSerializableException。我怎样才能实现我的目标?

简短的回答是:

您需要单独创建的每一个执行节点上的一个新的连接。
您不应该将数据库连接处理程序,文件处理程序等等传递给其他进程,尤其是远程计算机。

这里的问题是哪儿来创建数据库连接,因为有大量的执行者可以很容易地超过DB连接池的大小。

什么你其实可以做的是使用foreachPartition,喜欢这里:

// numPartitions == number of simultaneous DB connections you can afford 
    yourRdd.repartition(numPartitions) 
    .foreachPartition { 
    iter => 
     val connection = createConnection() 
     while (iter.hasNext) { 
     connection.execute("INSERT ...") 
     } 
     connection.commit() 
    } 

这里面.foreachPartition代码将每个执行者机器上执行,连接对象将不会通过网络发送,你赢了没有序列化异常,数据将被插入。

关于使用foreachPartition的相同推理也在this问题的答案中提到。

相关问题