2016-11-06 92 views
2

我正在通过pubsub以流模式(这是我的需求需要)接收到数据流的消息。 每条消息都应该存储在GCS自己的文件中。 因为不支持TextIO.Write中的无限集合,所以我试图将PCollection分成包含一个元素的窗口。 并写入每个窗口谷歌云存储。使用云数据流使用窗口从PubSub写入Google云端存储

这里是我的代码:

public static void main(String[] args) {  

      DataflowPipelineOptions options = PipelineOptionsFactory.create() 
        .as(DataflowPipelineOptions.class); 
       options.setRunner(BlockingDataflowPipelineRunner.class);     
       options.setProject(PROJECT_ID);    
       options.setStagingLocation(STAGING_LOCATION); 
       options.setStreaming(true); 
       Pipeline pipeline = Pipeline.create(options); 

       PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub") 
         .subscription(SUBSCRIPTION); 

       PCollection<String> streamData = pipeline.apply(readFromPubsub);   



       PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes()); 
      e 


       windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1")); 

       pipeline.run(); 
     } 

我仍然收到窗之前同样的错误了。

The DataflowPipelineRunner in streaming mode does not support TextIO.Write. 

执行上述操作的代码是什么?

+0

[创建在数据流的自定义接收]的可能的复制(http://stackoverflow.com/questions/40402150/creating-a-custom-sink-in-data-flow) – jkff

回答

2

TextIO与Bound PCollection一起使用,您可以使用API​​ Storage写入GCS。

你可以这样做:

PipeOptions options = data.getPipeline().getOptions().as(PipeOptions.class); 
    data.apply(WithKeys.of(new SerializableFunction<String, String>() { 
      public String apply(String s) { return "mykey"; } }))   

    .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(options.getTimeWrite())))) 
    .apply(GroupByKey.create()) 
    .apply(Values.<Iterable<String>>create()) 
    .apply(ParDo.of(new StorageWrite(options))); 

您创建GROUPBY的操作的窗口,你可以用迭代写入到存储。 StorageWrite的processElement:

 PipeOptions options = c.getPipelineOptions().as(PipeOptions.class); 
     String date = ISODateTimeFormat.date().print(c.window().maxTimestamp()); 
     String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp()); 
     String blobName = String.format("%s/%s/%s", options.getBucketRepository(), date, options.getFileOutName() + isoDate); 

     BlobId blobId = BlobId.of(options.getGCSBucket(), blobName); 

     WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build()); 

     for (Iterator<String> it = c.element().iterator(); it.hasNext();) { 
      writer.write(ByteBuffer.wrap(it.next().getBytes())); 
     } 
     writer.close(); 
+0

看起来不错....我会试试 – Ruchy

+0

如果它能工作,请您接受答案吗? –

相关问题