2

此问题是this one的后续行为。 我想使用Apache梁从谷歌扳手表中读取数据(然后做一些数据处理)。在apache光束中使用SpannerIO时出错

package com.google.cloud.dataflow.examples; 
import java.io.IOException; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; 
import org.apache.beam.sdk.options.PipelineOptions; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.values.PCollection; 
import com.google.cloud.spanner.Struct; 

public class backup { 

    public static void main(String[] args) throws IOException { 
    PipelineOptions options = PipelineOptionsFactory.create(); 

    Pipeline p = Pipeline.create(options); 
    PCollection<Struct> rows = p.apply(
      SpannerIO.read() 
       .withInstanceId("my_instance") 
       .withDatabaseId("my_db") 
       .withQuery("SELECT t.table_name FROM information_schema.tables AS t") 
       ); 

    PipelineResult result = p.run(); 
    try { 
     result.waitUntilFinish(); 
    } catch (Exception exc) { 
     result.cancel(); 
    } 
    } 
} 

当我尝试使用DirectRunner执行代码时,我得到了 以下错误消息:

org.apache.beam.runners.direct我使用了Java SDK写了下面的最低例子.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:

org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

或者使用DataflowRunner:

org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

在这两种情况下,错误信息都很隐秘,而且我无法找到任何关于Google搜索错误的原因。我也无法使用SpannerIO模块找到任何示例脚本。

此错误是由于我的代码中的明显错误,还是由于谷歌云工具的安装不当造成的?

+2

Argh,你很可能触及依赖冲突https://issues.apache.org/jira/browse/BEAM-2837。它是固定的,但我们需要等待新版本的光束。您可以从源代码自己构建光束二进制文件,或者在您的pom.xml中使用这个技巧https://gist.github.com/mairbek/0c770ff7b591e3db58936b0b9294215a –

+1

哦。谢谢 !我想我会尝试修复。 –

回答

1

你需要指定专案编号:

SpannerIO.read() 
      .withProjectId("my_project") 
      .withInstanceId("my_instance") 
      .withDatabaseId("my_db") 

而且你需要设置凭据的扳手项目。由于SpannerIO的API不允许您设置任何自定义凭据,因此您必须使用环境变量GOOGLE_APPLICATION_CREDENTIALS设置全局应用程序凭据。

您还可以使用JDBC读取(并写入)Cloud Spanner。读取是这样完成的:

 PCollection<KV<String, Long>> words = p2.apply(JdbcIO.<KV<String, Long>> read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("nl.topicus.jdbc.CloudSpannerDriver", 
        "jdbc:cloudspanner://localhost;Project=my-project-id;Instance=instance-id;Database=database;PvtKeyPath=C:\\Users\\MyUserName\\Documents\\CloudSpannerKeys\\cloudspanner-key.json")) 
      .withQuery("SELECT t.table_name FROM information_schema.tables AS t").withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, Long>>() 
      { 
       private static final long serialVersionUID = 1L; 

       @Override 
       public KV<String, Long> mapRow(ResultSet resultSet) throws Exception 
       { 
        return KV.of(resultSet.getString(1), resultSet.getLong(2)); 
       } 
      })); 

此方法还允许您通过设置PvtKeyPath来使用自定义凭证。您也可以使用JDBC写入Google Cloud Spanner。看看这里的例子:http://www.googlecloudspanner.com/2017/10/google-cloud-spanner-with-apache-beam.html

+0

我确实已经忘记了“projectID”这一行,但添加它并没有解决这个错误。 事实上,我使用的是Eclipse谷歌云工具插件,并且已经登录到我的谷歌账户。所以这应该照顾证书? 我可能不得不尝试JDBC版本。 –

1

这个问题最有可能是由这里描述的依赖性兼容性问题引起的:BEAM-2837。下面是在JIRA问题的意见,一个描述一个快速的解决方法:

<dependency> 
    <groupId>com.google.api.grpc</groupId> 
    <artifactId>grpc-google-common-protos</artifactId> 
    <version>0.1.9</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.beam</groupId> 
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> 
    <version>${beam.version}</version> 
    <exclusions> 
     <exclusion> 
      <groupId>com.google.api.grpc</groupId> 
      <artifactId>grpc-google-common-protos</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

明确定义所需的com.google.api.grpc依赖,从org.apache.beam排除版本。

+0

谢谢!老实说,我最终转向使用python SDK,并为读写器写一个自定义的ParDo。 –