2016-04-29 57 views
1

我在Google云端存储中拥有自定义文件格式,我想从Google DataFlow中读取它。如何在Google DataFlow中为Google云端存储文件实现自定义文件解析器

我已经通过继承FileBasedReader实现了一个Source和一个Reader,但后来我意识到它不支持从Google Cloud Storage中读取(而FileBasedSink实际上......),所以我不确定什么是最好的想法在这里解决...

我试图子类TextIO,但我不能达到目的,因为它似乎并没有被设计为subclassed。

关于如何处理这个问题的任何好主意?

谢谢。

更新,以反映在注释中使用

文件模式:gs://mybucket/my.json

从FileBasedSource实现源类:

MessageSource<T> extends FileBasedSource<T> 

实现Reader类(我真正关心这里)从FileBasedReader:

MessageReader<T> extends FileBasedReader<T> 

个工艺读数为:

MySource source = // instantiate source 
Pipeline p = Pipeline.create(options); 
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData")) 
    .apply(ParDo.of(new DoFn<String, String>() { 

而且的getSource()来源于此命令行参数(核对无误):

--source=gs://${BUCKET_NAME}/my.json \ 

我错过了什么?

月2日更新

在运行source.getEstimatedSizeBytes(options)它告诉我没有处理发现的?

java.io.IOException: Unable to find handler for gs://mybucket/my.json 
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186) 
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182) 
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
at java.lang.Thread.run(Thread.java:745) 

我以为FileBasedSource应该处理GCS?

+1

嗯,FileBasedReader绝对是打算用于Google云端存储。你能否编辑你的问题来澄清你在使用时遇到的问题? – jkff

+0

嗯...我可能错过了它,但是我看不到在源代码中对GCS的任何引用?无论如何,我遇到的问题是文件永远不会被加载,并且不会触发错误。我不知道如何调试。我还没有找到任何使用FileBasedReader做同样事情的例子。我会尝试更新这个问题,以便根据您的评论@jkff – nembleton

+0

来反映这个问题,请更新这个问题并提供更多详细信息 - 没有这些,我可以做的不多。事实上,几乎所有的资源,包括TextIO,都是在FileBasedSource/Reader的基础上实现的。 – jkff

回答

2

从“第二次更新”中显示的堆栈跟踪中,您看起来像直接从main()方法调用了getEstimatedSizeBytes。预计这会导致您看到的错误。

标准URL方案处理程序是在构建管道运行程序时注册的。在您的示例代码中,当您拨打Pipeline.create(options)(这称为PipelineRunner.fromOptions(options),标准处理程序已注册)时会发生这种情况。

如果您希望在运行管道以外的环境中注册标准URL方案,您可以明确地调用IOChannelUtils.registerStandardIOFactories()。我应该注意到,这不是一个支持的API,但有点“隐藏”。因此,它可能随时改变。

+0

是的你是对的。感谢细节。我后来意识到这一点,并得到了Google支持的证实。可悲的是,它与根本问题没有关系。 – nembleton

相关问题