2

为了测试目的,我想用BigQuery Connector在BigQuery中编写Parquet Avro日志。正如我写的,没有办法直接从UI读取Parquet来摄取它,所以我正在写Spark工作来这样做。如何在本地使用Spark BigQuery连接器?

在Scala中,暂时,工作机构是如下:

val events: RDD[RichTrackEvent] = 
readParquetRDD[RichTrackEvent, RichTrackEvent](sc, googleCloudStorageUrl) 

val conf = sc.hadoopConfiguration 
conf.set("mapred.bq.project.id", "myproject") 

// Output parameters 
val projectId = conf.get("fs.gs.project.id") 
val outputDatasetId = "logs" 
val outputTableId = "test" 
val outputTableSchema = LogSchema.schema 

// Output configuration 
BigQueryConfiguration.configureBigQueryOutput(
    conf, projectId, outputDatasetId, outputTableId, outputTableSchema 
) 
conf.set(
    "mapreduce.job.outputformat.class", 
    classOf[BigQueryOutputFormat[_, _]].getName 
) 

events 
    .mapPartitions { 
    items => 
     val gson = new Gson() 
     items.map(e => gson.fromJson(e.toString, classOf[JsonObject])) 
    } 
    .map(x => (null, x)) 
    .saveAsNewAPIHadoopDataset(conf) 

由于BigQueryOutputFormat没有找到谷歌证书,它的元数据回退主机上尝试用下面的堆栈跟踪,发现他们:

016-06-13 11:40:53 WARN HttpTransport:993 - exception thrown while executing request 
java.net.UnknownHostException: metadata 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589 at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933) 
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972) 
at com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:160) 
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489) 
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:207) 
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:72) 
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential(BigQueryFactory.java:81) 
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery(BigQueryFactory.java:101) 
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper(BigQueryFactory.java:89) 
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.<init>(BigQueryOutputCommitter.java:70) 
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat.getOutputCommitter(BigQueryOutputFormat.java:102) 
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat.getOutputCommitter(BigQueryOutputFormat.java:84) 
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat.getOutputCommitter(BigQueryOutputFormat.java:30) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1135) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:357) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078) 

这当然是预期的,但它应该能够使用我的服务帐户及其关键作为GoogleCredential.getApplicationDefault()收益从GOOGLE_APPLICATION_CREDENTIALS环境变量中获取相应的凭据。

由于连接器似乎是从hadoop配置中读取证书,所以要设置的密钥是什么,以便它读取GOOGLE_APPLICATION_CREDENTIALS?有没有办法将输出格式配置为使用提供的GoogleCredential对象?

回答

4

如果我正确理解你的问题 - 你可能需要设置:

<name>mapred.bq.auth.service.account.enable</name> 
<name>mapred.bq.auth.service.account.email</name> 
<name>mapred.bq.auth.service.account.keyfile</name> 
<name>mapred.bq.project.id</name> 
<name>mapred.bq.gcs.bucket</name> 

这里,mapred.bq.auth.service.account.keyfile应指向完整的文件路径老式的“P12”密钥文件;或者,如果您使用的是较新的“JSON”密钥文件,你应该跟单mapred.bq.auth.service.account.json.keyfile键代替“电子邮件”和“密钥文件”条目:

<name>mapred.bq.auth.service.account.enable</name> 
<name>mapred.bq.auth.service.account.json.keyfile</name> 
<name>mapred.bq.project.id</name> 
<name>mapred.bq.gcs.bucket</name> 

你可能也想看看https://github.com/spotify/spark-bigquery - 这与BQ和Spark合作更加文明。在这种情况下使用的setGcpJsonKeyFile方法与为Hadoop使用BQ连接器时为mapred.bq.auth.service.account.json.keyfile设置的JSON文件相同。

+0

的确,它是关于找到'mapred.bq.auth.service.account.enable' 'mapred.bq.auth.service.account.json.keyfile'。我想知道默认情况下,'mapred.bq.auth.service.account.enable = true'和'mapred.bq.auth.service.account.json.keyfile = $ GOOGLE_APPLICATION_CREDENTIALS'是否相关,因为它可能会令人困扰生产配置?也许在'BigQueryConfiguration'上提供一个util会帮助这样的'BigQueryConfiguration.configureBigQueryOutput'。 – Kayrnt