2017-10-04 129 views
0

我正在AWS上运行的Spark 2.2集群上运行结构化流式处理作业。我在eu-central-1中使用S3存储桶来执行点校验。 一些承诺对工人的行动似乎随意失败,出现以下错误:S3的Spark结构化流式传输失败致命

17/10/04 13:20:34 WARN TaskSetManager: Lost task 62.0 in stage 19.0 (TID 1946, 0.0.0.0, executor 0): java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0,part=62),dir=s3a://bucket/job/query/state/0/62] 
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:198) 
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:230) 
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:99) 
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:97) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:108) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXXXX, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: abcdef== 
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) 
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) 
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) 
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) 
at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507) 
at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143) 
at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131) 
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189) 
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134) 
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
... 3 more 

的作业subbmitted与下列选项允许欧盟 - 中部 - 1桶:

--packages org.apache.hadoop:hadoop-aws:2.7.4 
--conf spark.hadoop.fs.s3a.endpoint=s3.eu-central-1.amazonaws.com 
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem 
--conf spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 
--conf spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 
--conf spark.hadoop.fs.s3a.access.key=xxxxx 
--conf spark.hadoop.fs.s3a.secret.key=xxxxx 

我已经尝试生成没有特殊字符和使用实例策略的访问密钥,两者都具有相同的效果。

+1

请勿将S3用于检查点。由于S3在写后读后仅提供*最终一致性,因此不能保证当'HDFSBackedStateStore'列出文件或试图重命名文件时,它将存在于S3存储桶中,即使它刚刚被写入。 –

+0

我还能使用什么?使用HDFS时,最终更改日志会变得非常大以至于无法启动 –

+0

使用HDFS。我们在谈论哪个更改日志? –

回答

1

这种情况常常出现的Hadoop的团队provide a troubleshooting guide。但是就像Yuval所说的那样:直接向S3提交代码太危险了,而且创建的数据越慢越慢,列出不一致的风险意味着有时数据会丢失,至少在Apache Hadoop 2.6-2.8版本的S3A

+0

是的,我已经阅读了很多,但问题并不总是在发生。所以我想这是一个不存在的文件夹或文件,因为最终一致性 –

+0

不,不是那样的:它会表现为FileNotFoundException。这是身份验证,并且不容易追查,特别是因为安全原因,代码不敢记录有用的信息,如使用的特定秘密。 如果只是针对法兰克福,可能是v4 api问题 –

0

你的日志说:

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXXXX, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: abcdef==

这意味着错误的凭据是不正确的。

val credentials = new com.amazonaws.auth.BasicAWSCredentials(
    "ACCESS_KEY_ID", 
    "SECRET_ACCESS_KEY" 
) 

出于调试目的

1)访问密钥/秘密密钥是既有效

2)桶的名字是正确与否

3)打开日志记录在CLI和比较它与SDK

4)启用SDK日志记录如下所述:

http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-logging.html

您需要提供log4j jar和示例log4j.properties文件。

http://docs.aws.amazon.com/ses/latest/DeveloperGuide/get-aws-keys.html