11

我想为DataSet中的Row类型编写一个编码器,用于我正在执行的映射操作。本质上,我不懂如何编写编码器。用于行类型Spark数据集的编码器

下面是一个地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { 
      @Override 
      public Iterator<String> call(Row row) throws Exception { 

       ArrayList<String> obj = //some map operation 
       return obj.iterator(); 
      } 
     },Encoders.STRING()); 

我理解而不是字符串编码器,需要如下要被写入:

Encoder<Row> encoder = new Encoder<Row>() { 
     @Override 
     public StructType schema() { 
      return join.schema(); 
      //return null; 
     } 

     @Override 
     public ClassTag<Row> clsTag() { 
      return null; 
     } 
    }; 

然而,我不理解编码器中的clsTag(),我试图找到一个可以解释类似事物的运行示例(即一个行类型的编码器)

编辑 - 这不是上述问题的副本:Encoder error while trying to map dataframe row to updated row作为答案在Spark 2.x中使用Spark 1.x(我没有这样做),我也在寻找一个编码器而不是解决错误。最后,我正在寻找Java中的解决方案,而不是在Scala中。

回答

9

答案是使用RowEncoder和使用TypeStruct的数据集的模式。

下面是一个数据集A flatmap操作的工作示例:

StructType structType = new StructType(); 
    structType = structType.add("id1", DataTypes.LongType, false); 
    structType = structType.add("id2", DataTypes.LongType, false); 

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); 

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { 
     @Override 
     public Iterator<Row> call(Row row) throws Exception { 
      // a static map operation to demonstrate 
      List<Object> data = new ArrayList<>(); 
      data.add(1l); 
      data.add(2l); 
      ArrayList<Row> list = new ArrayList<>(); 
      list.add(RowFactory.create(data.toArray())); 
      return list.iterator(); 
     } 
    }, encoder); 
+0

应该不是这个无法在集群模式,因为ArrayList的不序列化 – user482963