2016-11-23 70 views
2

我想实现一个Reshuffle变换,以防止excessive fusion,但我不知道如何改变版本<KV<String,String>>处理简单PCollections。 (描述here如何洗牌PCollection <KV<String,String>>。)如何重新洗牌PCollection <T>?

我怎么会加入我的管道更多的步骤之前展开正式的Avro I/O example code重新洗牌?

PipelineOptions options = PipelineOptionsFactory.create(); 
Pipeline p = Pipeline.create(options); 

Schema schema = new Schema.Parser().parse(new File("schema.avsc")); 

PCollection<GenericRecord> records = 
    p.apply(AvroIO.Read.named("ReadFromAvro") 
     .from("gs://my_bucket/path/records-*.avro") 
     .withSchema(schema)); 

回答

3

感谢由谷歌支持团队提供的代码片段我想通了:

为了得到一个重新洗牌PCollection:

PCollection<T> reshuffled = data.apply(Repartition.of()); 

使用的磁盘分割类:

import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.GroupByKey; 
import com.google.cloud.dataflow.sdk.transforms.PTransform; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.KV; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import java.util.concurrent.ThreadLocalRandom; 

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> { 

    private Repartition() {} 

    public static <T> Repartition<T> of() { 
     return new Repartition<T>(); 
    } 

    @Override 
    public PCollection<T> apply(PCollection<T> input) { 
     return input 
       .apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>())) 
       .apply(GroupByKey.<Integer, T>create()) 
       .apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>())); 
    } 

    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); 
     } 
    } 

    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      for (T s : c.element().getValue()) { 
       c.output(s); 
      } 
     } 
    } 
} 
+0

你能详细说一下'AddArbitaryKey'吗?为什么“AddArbitraryKey”的必要性和特殊实现是重要的,即它是否会影响密钥空间在工作人员中分布的方式? – harveyxia

+0

应该引起再分配一样了'Redistribution'变换一种武断的方式(参见:https://github.com/apache/incubator-beam/pull/1036)。随机选择的整数键应导致随机分布。 – Tobi

+0

谢谢,你的'Redistribution'的用例是什么? – harveyxia