2016-11-08 110 views
0

我在Spark中得到了任务不可序列化错误。我已经搜索并试图使用一些静态函数建议在一些职位,但它仍然给出了同样的错误。任务不可串行化 - Spark Java

守则如下:

public class Rating implements Serializable { 
    private SparkSession spark; 
    private SparkConf sparkConf; 
    private JavaSparkContext jsc; 
    private static Function<String, Rating> mapFunc; 

    public Rating() { 
     mapFunc = new Function<String, Rating>() { 
      public Rating call(String str) { 
       return Rating.parseRating(str); 
      } 
     }; 
    } 

    public void runProcedure() { 
     sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local"); 
     jsc = new JavaSparkContext(sparkConf); 
     SparkSession spark = SparkSession.builder().master("local").appName("Word Count") 
      .config("spark.some.config.option", "some-value").getOrCreate();   

     JavaRDD<Rating> ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt") 
       .javaRDD() 
       .map(mapFunc); 
    } 

    public static void main(String[] args) { 
     Rating newRating = new Rating(); 
     newRating.runProcedure(); 
    } 
} 

错误得出: enter image description here

我怎么解决这个问题? 在此先感谢。

回答

7

显然Rating不能是Serializable,因为它包含对Spark结构(即SparkSession,SparkConf等)的引用作为属性。这里

的问题是在

JavaRDD<Rating> ratingsRD = spark.read().textFile("sample_movielens_ratings.txt") 
      .javaRDD() 
      .map(mapFunc); 

如果你看看mapFunc定义,你返回一个Rating对象。

mapFunc = new Function<String, Rating>() { 
    public Rating call(String str) { 
     return Rating.parseRating(str); 
    } 
}; 

该功能(在火花方面具有变换)中使用的map内部。因为转换直接执行到工作节点而不是在驱动程序节点中,所以它们的代码必须是可串行化的。这迫使Spark尝试序列化Rating类,但这是不可能的。

尝试从Rating中提取您需要的功能,并将它们放置在不具有任何Spark结构的不同类中。最后,使用这个新类作为你的mapFunc函数的返回类型。

+0

将评分和程序分成两个班级工作!谢谢 :) – Fleur

相关问题