我正在通过pubsub以流模式(这是我的需求需要)接收到数据流的消息。 每条消息都应该存储在GCS自己的文件中。 因为不支持TextIO.Write中的无限集合,所以我试图将PCollection分成包含一个元素的窗口。 并写入每个窗口谷歌云存储。使用云数据流使用窗口从PubSub写入Google云端存储
这里是我的代码:
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setStagingLocation(STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(SUBSCRIPTION);
PCollection<String> streamData = pipeline.apply(readFromPubsub);
PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
e
windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));
pipeline.run();
}
我仍然收到窗之前同样的错误了。
The DataflowPipelineRunner in streaming mode does not support TextIO.Write.
执行上述操作的代码是什么?
[创建在数据流的自定义接收]的可能的复制(http://stackoverflow.com/questions/40402150/creating-a-custom-sink-in-data-flow) – jkff