2016-09-20 91 views
2

我想将结果添加到mysql thorugh foreachpartition,但得到错误org.apache.spark.SparkException:任务不可序列化java。org.apache.spark.SparkException:任务不可序列化java

公共类插入实现Serializable {

transient static JavaSparkContext spc; 
public static void main(String gg[]) 
{ 

Map<String, String> options = new HashMap<String, String>(); 
     options.put("url","jdbc:mysql://localhost:3306/testing?user=root&password=pwd"); 
     options.put("dbtable", "rtl"); 
SparkConf ss=new SparkConf().setAppName("insert").setMaster("local"); 

spc=new JavaSparkContext(ss); 

    JavaRDD<String> rbm=spc.textFile(path); 
    // DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable")); 

    // System.out.println("Data------------------->" + jdbcDF.toJSON().first()); 


JavaRDD<String> file=rbm.flatMap(new FlatMapFunction<String, String>() { 
NotSerializableException nn=new NotSerializableException(); 
    public Iterable<String> call(String x) { 
     // TODO Auto-generated method stub 

     return Arrays.asList(x.split(" ")[0]); 
    } 
}); 



try { 
    file.foreachPartition(new VoidFunction<Iterator<String>>() { 
    Connection conn= (Connection) DriverManager.getConnection("jdbc:mysql://localhost/testing","root","[email protected]"); 

     PreparedStatement del = (PreparedStatement) conn.prepareStatement ("INSERT INTO rtl (rtl_s) VALUES (?) "); 
     NotSerializableException nn=new NotSerializableException(); 
      public void call(Iterator<String> x) throws Exception { 
       // TODO Auto-generated method stub 
    while(x.hasNext()) 
    { 
       String y=x.toString(); 
       del.setString(1, y); 
       del.executeUpdate(); 
    } 
      } 

    }); 
} catch (Exception e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
} 

我得到以下错误

6/09/20 12:37:58 INFO SparkContext: Created broadcast 0 from textFile at Insert.java:41 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46) 
    at final_file.Insert.main(Insert.java:59) 
Caused by: java.io.NotSerializableException: java.lang.Object 
Serialization stack: 
    - object not serializable (class: java.lang.Object, value: [email protected]) 
    - writeObject data (class: java.util.HashMap) 
    - object (class java.util.HashMap, {[email protected], [email protected], [email protected]}) 
    - field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map) 
    - object (class com.mysql.jdbc.JDBC4Connection, [email protected]) 
    - field (class: final_file.Insert$2, name: conn, type: interface com.mysql.jdbc.Connection) 
    - object (class final_file.Insert$2, [email protected]) 
    - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, name: f$12, type: interface org.apache.spark.api.java.function.VoidFunction) 
    - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 12 more 

我收到上述错误,试图更新结果到MySQL。

+0

'DriverManager'包含什么?它似乎不能被序列化。 –

+0

其实它包含了mysql的属性。它有用户名和密码以及db名称。 – Aman

回答

0
> The issue has been resolved after changing in try block. 
> 
> try { 
>  file.foreachPartition(new VoidFunction<Iterator<String>>() { 
> 
>   NotSerializableException nn=new NotSerializableException(); 
>    public void call(Iterator<String> x) throws Exception { 
>     // TODO Auto-generated method stub 
>  Connection conn= (Connection) DriverManager.getConnection("jdbc:mysql://localhost/testing","root","[email protected]"); 
> 
>   PreparedStatement del = (PreparedStatement) conn.prepareStatement ("INSERT INTO rtl (rtl_s) VALUES (?) "); 
>  while(x.hasNext()) 
>  { 
>     String y=x.toString(); 
>     del.setString(1, y); 
>     del.executeUpdate(); 
>  } 
>    } 
> 
>  }); } catch (Exception e) { 
>  // TODO Auto-generated catch block 
>  e.printStackTrace(); } 
+0

http://stackoverflow.com/questions/40818001/understanding-spark-serialization/40818002?sfb=2#40818002概述了解火花序列化 – KrazyGautam

相关问题