0

我试图建立在谷歌云数据流管道,会做下列文件阅读:谷歌云数据流:从动态文件名

  • 听取了关于发布订阅订阅事件
  • 提取从文件名事件文本
  • 读取文件(从谷歌Cloud Storage桶)
  • 商店BigQuery中

记录以下是代码:

Pipeline pipeline = //create pipeline 
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub")) 
     .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>) 
     .apply(TextIO.read().from(""))??? 

我在第三步挣扎,不太确定如何访问第二步的输出并在第三步中使用它。我曾尝试编写产生以下代码的代码:

private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){ 
    //A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method 
} 

但是,我无法在后续步骤中读取文件内容。

任何人都可以请我知道我需要在第3步和第4步写什么,以便我可以逐行使用文件并将输出存储到BigQuery(或者只是记录它)。

回答

2

表达您阅读的自然方式是使用TextIO.readAll()方法,该方法从文件名的输入PCollection中读取文本文件。该方法已在Beam代码库中引入,但目前尚未发布。它将包含在Beam 2.2.0发行版和相应的Dataflow 2.2.0发行版中。

-1

您可以使用SerializableFunction完成此操作。

你可以做

pipeline.apply(TextIO.read().from(new FileNameFn())); 

public class FileNameFn implements SerializableFunction<inputFileNameString, outputQualifiedFileNameStringWithBucket> 

显然,你可以传递水桶名称和其他参数的静态同时通过构造函数的参数创建该类的实例。

希望这会有所帮助。

+0

您引用的方法不存在:TextIO.read()。from()仅适用于String或ValueProvider 。您可能会将其与write()中更动态的方法混淆。 – jkff

相关问题