3

在Google Cloud Dataflow中,我的连接失败,并显示“TupleTag Tag对应于非单例结果”。从错误堆栈看来,这种情况在CoGBKResults中的overide方法中发生。TupleTag Tag <taginfo>对应于非单例结果

String Ad_ID = e.getKey(); 
String Ad_Info = "none"; 
Ad_Info = e.getValue().getOnly(AdInfoTag); 

以下是我的连接方法。

static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable, 
     PCollection<TableRow> AdTable) throws Exception { 

    final TupleTag<String> ImpressionInfoTag = new TupleTag<String>(); 
    final TupleTag<String> AdInfoTag = new TupleTag<String>(); 

    // transform both input collections to tuple collections, where the keys are Ad_ID 
    PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
     ParDo.of(new ExtractImpressionDataInfoFn())); 
    PCollection<KV<String, String>> AdInfo = AdTable.apply(
     ParDo.of(new ExtractAdDataInfoFn())); 

    // Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>) 
    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple 
     .of(ImpressionInfoTag, ImpressionInfo) 
     .and(AdInfoTag, AdInfo) 
     .apply(CoGroupByKey.<String>create()); 

    // Process the CoGbkResult elements generated by the CoGroupByKey transform. 
    // Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo> 
    PCollection<KV<String, String>> finalResultCollection = 
     kvpCollection.apply(ParDo.named("Process").of(
     new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { 
      private static final long serialVersionUID = 1L; 

     @Override 
      public void processElement(ProcessContext c) { 
      KV<String, CoGbkResult> e = c.element(); 
      String Ad_ID = e.getKey(); 
      String Ad_Info = "none"; 
      Ad_Info = e.getValue().getOnly(AdInfoTag); 
      for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
       // Generate a string that combines information from both collection values 
       c.output(KV.of(Ad_ID, " " + Ad_Info 
         + " " + eventInfo)); 
      } 
      } 
     })); 

    //write to GCS 
    PCollection<String> formattedResults = finalResultCollection 
     .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { 
      @Override 
      public void processElement(ProcessContext c) { 
      String outputstring = "AdUnitID: " + c.element().getKey() 
       + ", " + c.element().getValue(); 
      c.output(outputstring); 
      } 
     })); 
    return formattedResults; 
    } 

我的ExtractImpressionDataInfoFn类和ExtractAdDatInfoFn类如下所示。

static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void processElement(ProcessContext c) { 
     TableRow row = c.element(); 
     String Ad_ID = (String) row.get("AdUnitID"); 
     String User_ID = (String) row.get("UserID"); 
     String Client_ID = (String) row.get("ClientID"); 
     String Impr_Time = (String) row.get("GfpActivityAdEventTIme"); 
     String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time; 
     c.output(KV.of(Ad_ID, ImprInfo)); 
    } 
} 


static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void processElement(ProcessContext c) { 
     TableRow row = c.element(); 
     String Ad_ID = (String) row.get("AdUnitID"); 
     String Content_ID = (String) row.get("ContentID"); 
     String Pub_ID = (String) row.get("Publisher"); 
     String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID; 
     c.output(KV.of(Ad_ID, Add_Info)); 
    } 
} 

架构的印象和广告都低于 印象: adUnitId设置 用户名 客户端ID
GfpActivityAdEventTIme

广告: adUnitId设置 客户端ID 出版商

enter image description here

+0

https://github.com/kosalan/gcppoc/tree/master/src/main/java/com/gcp/poc 满级在这里可以查看 – KosiB

+0

您可以在这个问题完整的错误堆栈? –

+0

您是否有Dataflow的工作ID? –

回答

1

该错误表明,当您致电getOnly时,CoGroupByKey有多个结果。特别是这条线:

Ad_Info = e.getValue().getOnly(AdInfoTag); 

如果你改变它为getAll(AdInfoTag)它应该工作。

+0

这作品。非常感谢 – KosiB