2017-06-24 30 views
3

我迁移我的谷歌数据流的Java 1.9至2.0光束和我想我挣扎试图将BigtableIO之前使用BigtableIO.Write阿帕奇束BigTable中可迭代突变

.... 
.apply("", BigtableIO.write() 
       .withBigtableOptions(bigtableOptions) 
       .withTableId("twoSecondVitals")); 

在帕尔多进行Iterable。

  try{ 
     Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v)); 
     Mutation mu[] = {mutation}; 
     Iterable<Mutation> imu = Arrays.asList(mu); 
     log.severe("imu"); 
     c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu)); 
     }catch (Exception e){ 
     log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage()); 
     } 

上面的代码抛出以下异常 InvalidProtocolBufferException:协议消息结束组标签不匹配预期标签

v是对象(Vitals.class)的列表。 hbase api使用Put方法创建突变。一个人如何创建一个可以与BigtableIO接收器配合使用的BigTable突变?

+0

想通了。 – Mike

+1

您可以添加您的解决方案作为答案,以便明确您的解决方案 - 并在将来帮助其他人。 – Pablo

回答

4

通过查看sdk的测试,我能够找到我的答案。

  Iterable<Mutation> mutations = 
       ImmutableList.of(Mutation.newBuilder() 
       .setSetCell(
         Mutation.SetCell.newBuilder() 
         .setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v))) 
         .setFamilyName("vitals") 
       ).build());