2
我想将结果添加到mysql thorugh foreachpartition,但得到错误org.apache.spark.SparkException:任务不可序列化java。org.apache.spark.SparkException:任务不可序列化java
公共类插入实现Serializable {
transient static JavaSparkContext spc;
public static void main(String gg[])
{
Map<String, String> options = new HashMap<String, String>();
options.put("url","jdbc:mysql://localhost:3306/testing?user=root&password=pwd");
options.put("dbtable", "rtl");
SparkConf ss=new SparkConf().setAppName("insert").setMaster("local");
spc=new JavaSparkContext(ss);
JavaRDD<String> rbm=spc.textFile(path);
// DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
// System.out.println("Data------------------->" + jdbcDF.toJSON().first());
JavaRDD<String> file=rbm.flatMap(new FlatMapFunction<String, String>() {
NotSerializableException nn=new NotSerializableException();
public Iterable<String> call(String x) {
// TODO Auto-generated method stub
return Arrays.asList(x.split(" ")[0]);
}
});
try {
file.foreachPartition(new VoidFunction<Iterator<String>>() {
Connection conn= (Connection) DriverManager.getConnection("jdbc:mysql://localhost/testing","root","[email protected]");
PreparedStatement del = (PreparedStatement) conn.prepareStatement ("INSERT INTO rtl (rtl_s) VALUES (?) ");
NotSerializableException nn=new NotSerializableException();
public void call(Iterator<String> x) throws Exception {
// TODO Auto-generated method stub
while(x.hasNext())
{
String y=x.toString();
del.setString(1, y);
del.executeUpdate();
}
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我得到以下错误
6/09/20 12:37:58 INFO SparkContext: Created broadcast 0 from textFile at Insert.java:41
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
at final_file.Insert.main(Insert.java:59)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
- object not serializable (class: java.lang.Object, value: [email protected])
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {[email protected], [email protected], [email protected]})
- field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map)
- object (class com.mysql.jdbc.JDBC4Connection, [email protected])
- field (class: final_file.Insert$2, name: conn, type: interface com.mysql.jdbc.Connection)
- object (class final_file.Insert$2, [email protected])
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, name: f$12, type: interface org.apache.spark.api.java.function.VoidFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 12 more
我收到上述错误,试图更新结果到MySQL。
'DriverManager'包含什么?它似乎不能被序列化。 –
其实它包含了mysql的属性。它有用户名和密码以及db名称。 – Aman