2017-08-28 59 views
2

我需要阅读从GCS桶的文件。我知道我将不得不使用GCS API /客户端库,但我无法找到与此相关的任何实例。阅读从GCS文件在Apache的梁

我一直在参考GCS文档中的链接: GCS Client Libraries。但无法真正发挥作用。如果有人能提供一个真正有用的例子。 谢谢。

+0

所有变换像TEXTIO,AvroIO等,才能够在默认情况下,例如与GCS档案工作TextIO.read()的。( “GS://你的桶/路径/要/你的档案/ *”)。他们不适合你吗? – jkff

+0

嗨@jkff我知道。我忘了提及为什么我要使用客户端库。实际上,我面临的问题是,当我使用TextIO.read()读取文件时,我得到的数据与它在文件中显示的顺序不同。我需要按照与文件中相同的顺序获取数据。这个怎么做? – rish0097

+0

正确,PCollections是无序的,因为它们是并行处理的,顺序排列与并行性相反。也就是说,确实存在你想在并行流水线中进行一些顺序处理的情况。你能告诉更多关于你的使用案例和为什么你需要订购吗? – jkff

回答

2

确定。如果你想简单地从GCS读取文件,而不是作为一个PCollection但作为普通的文件,如果您有与GCS Java客户端库的麻烦,你也可以使用Apache梁FileSystems API:

首先,你需要确保你有你的pom.xmlbeam-sdks-java-extensions-google-cloud-platform-core一个Maven依赖它包含实施gs://文件系统:

<dependency> 
    <groupId>org.apache.beam</groupId> 
    <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> 
</dependency> 

然后设置文件系统API(它在默认情况下所有管线设置,但如果你”重新在管道外使用它,你需要手动完成)。

PipelineOptions options = PipelineOptionsFactory.create(); 
// ...Optionally fill in options such as GCP credentials... 
// (see GcpOptions class) 
FileSystems.setDefaultPipelineOptions(options); 

然后你可以使用它:

ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
    "gs://path/to/your/file", false /* is_directory */)); 
try (InputStream stream = Channels.newInputStream(chan)) { 
    // Use regular Java utilities to work with the input stream. 
} 
+0

我按照这种方法,并在我的本地机器上运行该程序,并得到以下错误,“线程中的异常”主“java.lang.IllegalStateException:无法找到gs \t的注册器在org.apache.beam。 sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447) \t at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:517)'。任何帮助? –

+1

我编辑了我的答案,再试一次? – jkff

+0

感谢@jkff这个作品! – rish0097