我试图建立在谷歌云数据流管道,会做下列文件阅读:谷歌云数据流:从动态文件名
- 听取了关于发布订阅订阅事件
- 提取从文件名事件文本
- 读取文件(从谷歌Cloud Storage桶)
- 商店BigQuery中
记录以下是代码:
Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
.apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
.apply(TextIO.read().from(""))???
我在第三步挣扎,不太确定如何访问第二步的输出并在第三步中使用它。我曾尝试编写产生以下代码的代码:
private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
//A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}
但是,我无法在后续步骤中读取文件内容。
任何人都可以请我知道我需要在第3步和第4步写什么,以便我可以逐行使用文件并将输出存储到BigQuery(或者只是记录它)。
您引用的方法不存在:TextIO.read()。from()仅适用于String或ValueProvider。您可能会将其与write()中更动态的方法混淆。 –
jkff