我以这种方式解释你的问题:
我想从所有Spark执行程序的RDD中插入数据。我试图在驱动程序上创建一个数据库连接,并以某种方式将它作为广播传递给执行者,但Spark一直在投掷NotSerializableException
。我怎样才能实现我的目标?
简短的回答是:
您需要单独创建的每一个执行节点上的一个新的连接。
您不应该将数据库连接处理程序,文件处理程序等等传递给其他进程,尤其是远程计算机。
这里的问题是哪儿来创建数据库连接,因为有大量的执行者可以很容易地超过DB连接池的大小。
什么你其实可以做的是使用foreachPartition,喜欢这里:
// numPartitions == number of simultaneous DB connections you can afford
yourRdd.repartition(numPartitions)
.foreachPartition {
iter =>
val connection = createConnection()
while (iter.hasNext) {
connection.execute("INSERT ...")
}
connection.commit()
}
这里面.foreachPartition
代码将每个执行者机器上执行,连接对象将不会通过网络发送,你赢了没有序列化异常,数据将被插入。
关于使用foreachPartition
的相同推理也在this问题的答案中提到。
数据库对象的意思是? DTO或其他东西? –
请查看[如何创建最小,完整和可验证的示例](https://stackoverflow.com/help/mcve)并相应地重写您的问题。 –
我创建了一个处理连接数据库和所有数据库交互的类。 –