0

我使用spark-cassandra-connector 1.6.2设置了数据帧。 我尝试用cassandra执行一些转换。 Datastax企业版本是5.0.5。加入两个数据帧时CassandraSourceRelation不可序列化

DataFrame df1 = sparkContext 
      .read().format("org.apache.spark.sql.cassandra") 
      .options(readOptions).load() 
      .where("field2 ='XX'") 
      .limit(limitVal) 
      .repartition(partitions); 

List<String> distinctKeys = df1.getColumn("field3").collect(); 

values = some transformations to get IN query values; 

String cassandraQuery = String.format("SELECT * FROM " 
      + "table2 " 
      + "WHERE field2 = 'XX' " 
      + "AND field3 IN (%s)", values); 
DataFrame df2 = sparkContext.cassandraSql(cassandraQuery); 

String column1 = "field3"; 
String column2 = "field4"; 
List<String> columns = new ArrayList<>(); 
     columns.add(column1); 
     columns.add(column2); 
scala.collection.Seq<String> usingColumns = 
scala.collection.JavaConverters. 
collectionAsScalaIterableConverter(columns).asScala().toSeq(); 
DataFrame joined = df1.join(df2, usingColumns, "left_outer"); 

List<Row> collected = joined.collectAsList(); // doestn't work 
Long count = joined.count(); // works 

这是异常日志,看起来像spark正在创建cassandra源的重要性,并且它不能被序列化。

java.io.NotSerializableException: java.util.ArrayList$Itr 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.sql.cassandra.CassandraSourceRelation, value: 
[email protected]) 
- field (class: org.apache.spark.sql.execution.datasources.LogicalRelation, 
name: relation, type: class org.apache.spark.sql.sources.BaseRelation) 
- object (class org.apache.spark.sql.execution.datasources.LogicalRelation, 
Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Filter, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Filter, Filter 
(field2#0 = XX) 
+- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Repartition, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Repartition, 
Repartition 4, true 
+- Filter (field2#0 = XX) 
+- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Join, name: left, 
type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Join, Join 
LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20))) 
:- Repartition 4, true 
: +- Filter (field2#0 = XX) 
:  +- Relation[fields] 
[email protected] 
+- Project [fields] 
+- Filter ((field2#17 = YY) && field3#18 IN (IN array)) 
    +- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Project, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Project, Project 
[fields] 
+- Join LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20))) 
:- Repartition 4, true 
: +- Filter (field2#0 = XX) 
:  +- Relation[fields] 
[email protected] 
+- Project [fields] 
    +- Filter ((field2#17 = XX) && field3#18 IN (IN array)) 
    +- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, name: 
$outer, type: class org.apache.spark.sql.catalyst.trees.TreeNode) 
- object (class org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, 
<function1>) 
- field (class: 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
name: $outer, type: class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4) 
- object (class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
<function1>) 
- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: f$1, 
type: interface scala.Function1) 
- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>) 
- writeObject data (class: scala.collection.immutable.$colon$colon) 
- object (class scala.collection.immutable.$colon$colon, 
List([email protected])) 
- field (class: org.apache.spark.rdd.RDD, name: 
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[32] at 
collectAsList at RevisionPushJob.java:308) 
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, 
type: class org.apache.spark.rdd.RDD) 
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>) 
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: 
$outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) 
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, 
<function1>) 

是否有可能使其序列化?为什么计数操作工作但收集操作不?

回答

0

看到错误消息指向java.util.ArrayList$Itr是您的unserialzable位,我想可能是

List<String> columns = new ArrayList<>(); 
    columns.add(column1); 
    columns.add(column2); 

参考哪一个在它的隐式转换可能需要数组列表迭代器的序列化?这是我看到的唯一ArrayList,所以它可能是罪魁祸首。它也可能出现在您为“值”删除的代码中。

当你做Count它可以放弃列信息,这样可能会节省你,但我不能确定。

因此,TLDR我的建议是试图从代码中删除东西,并替换并重新构建代码以找到不可序列化的位。