2017-07-28 52 views
3

我需要一个jdbc接收器用于我的火花结构化流数据帧。目前,就我所知,DataFrame的API缺少jdbc实现的写入流(无论是在pyspark还是在scala(当前Spark版本2.2.0)中)。我发现的唯一建议是根据this article编写我自己的ForeachWriter scala课程。所以,我通过添加一个自定义的ForeachWriter类修改了here这个简单的字数统计范例,并试图将writestream写入postgress。词汇流从控制台手动生成(使用NetCat:nc -lk -p 9999)并通过插座中的火花读取。如何编写用于Spark结构化流的JDBC接收器[SparkException:任务不可序列化]?

不幸的是,我得到“任务不可序列化”错误。

APACHE_SPARK_VERSION = 2.1.0 使用的Scala版本2.11.8(爪哇热点(TM)64位服务器VM,爪哇1.8.0_112)

我Scala代码:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770). 
//Spark session available as 'spark'. 

import java.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

val spark = SparkSession 
    .builder 
    .master("local[*]") 
    .appName("StructuredNetworkWordCountToJDBC") 
    .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar") 
    .getOrCreate() 

import spark.implicits._ 

val lines = spark.readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", 9999) 
    .load() 

val words = lines.as[String].flatMap(_.split(" ")) 

val wordCounts = words.groupBy("value").count() 

class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{ 
    val driver = "org.postgresql.Driver" 
    var connection:java.sql.Connection = _ 
    var statement:java.sql.Statement = _ 

    def open(partitionId: Long, version: Long):Boolean = { 
     Class.forName(driver) 
     connection = java.sql.DriverManager.getConnection(url, user, pwd) 
     statement = connection.createStatement 
     true 
    } 

    def process(value: org.apache.spark.sql.Row): Unit = {   
    statement.executeUpdate("INSERT INTO public.test(col1, col2) " + 
          "VALUES ('" + value(0) + "'," + value(1) + ");") 
    } 

    def close(errorOrNull:Throwable):Unit = { 
     connection.close 
    } 
} 

val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" 
val user="<user name>" 
val pwd="<pass>" 
val writer = new JDBCSink(url, user, pwd) 

import org.apache.spark.sql.streaming.ProcessingTime 

val query=wordCounts 
    .writeStream 
    .foreach(writer) 
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds")) 
    .start() 

query.awaitTermination() 

错误消息:

ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error 
org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) 
     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) 
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution 
Serialization stack: 
     - object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED]) 
     - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery) 
     - object (class $line21.$read$$iw$$iw, [email protected]) 
     - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw) 
     - object (class $line21.$read$$iw, [email protected]) 
     - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw) 
     - object (class $line21.$read, [email protected]) 
     - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read) 
     - object (class $line25.$read$$iw, [email protected]) 
     - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw) 
     - object (class $line25.$read$$iw$$iw, [email protected]) 
     - field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw) 
     - object (class $line25.$read$$iw$$iw$JDBCSink, [email protected]) 
     - field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter) 
     - object (class org.apache.spark.sql.execution.streaming.ForeachSink, [email protected]) 
     - field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink) 
     - object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
     ... 25 more 

如何使它工作?

SOLUTION

(感谢所有,特殊thaks到@zsxwing一个简单的解决方案):

  1. 保存JDBCSink类的文件。
  2. 在spark-shell中加载一个类f.eg.使用scala> :load <path_to_a_JDBCSink.scala_file>
  3. 最后scala> :paste没有JDBCSink类定义的代码。
+0

你是否尝试过使'connection'和'statement' @transient属性? –

+0

谢谢@Vitaliy Kotlyarenko。我刚刚用'@transient var connection'和'@transient var statement'尝试过,但不幸的是收到了同样的错误。 – Lukiz

+0

你如何执行代码?我想 - 你已经把它粘贴到“火星壳”上了,不是吗?如果是这样,它将无法工作(因为您经历过),因为它关闭了一些不可序列化的对象。 –

回答

3

只需在分离的文件中定义JDBCSink,而不是将其定义为可捕获外部引用的内部类。

+0

很多,非常感谢!这解决了问题!如果你不介意的话,我会用你的解决方案更新我的问题。 – Lukiz

0

看起来像这里的罪犯是JDBCSink类中使用import spark.implicits._

  • JDBCSink必须是可序列化
  • 通过添加此导入,你让你的JDBCSink参考不可序列SparkSession这然后连同它一起序列化(技术上,SparkSession extends Serializable,但它并不意味着在工作节点上被反序列化)

好消息是:你没有使用这个导入,所以如果你只是删除它,这应该工作。

0

如果有人遇到这种以互动的工作簿,该解决方案也适用:

取而代之的是JDBCSink类保存到一个单独的文件中,你也可以只宣布它作为一个单独的包(“包装细胞”)在相同的工作簿中,并导入您使用它的单元中的包。这里描述得很好https://docs.databricks.com/user-guide/notebooks/package-cells.html

相关问题