我在同一个主题上看到很多问题。但是,我仍然在写入GCS时遇到问题。我正在阅读pubsub的主题,并试图将其推广到GCS。我提到了this link。但是,在最新的束包装中找不到IOChannelUtils。写一个无限的收集到GCS
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
这个我已经从堆栈溢出中的许多其他类似的话题。现在,我明白了,TextIO支持带有withWindowedWrites和withNumShards的无限PCollection写入选项。
裁判:Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn
但是,我不明白,我应该怎么做。
我正在尝试写入GCS,如下所示。
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
我没有足够的观点来为堆栈溢出中的这些主题添加注释,因此我将其作为一个不同的问题提出来。
嘿..谢谢你的答案。我可以在几分钟之前完成它。我会用我采取的方法更新这个答案。再次感谢! – Balu