2017-02-03 55 views
0

我看到几个帖子在这里,并在谷歌搜索org.apache.hadoop.mapred.InvalidInputException 但大多数处理HDFS文件或捕获的错误。我的问题是,尽管我可以从spark-shell读取一个CSV文件,但是从编译的JAR运行它不断返回org.apache.hadoop.mapred.InvalidInputException错误。星火S3 CSV read返回org.apache.hadoop.mapred.InvalidInputException

罐子的工艺粗糙:
1.从S3 JSON文件(这工作)的拼花文件S3
2.读取读取(这也成功)
3.编写一个查询的结果将#1和#2映射到S3中的镶木文件(也成功)
4.从同一个存储区#3读取配置csv文件。 (失败)

这些都是我曾尝试在代码中的各种方法:

1. val osRDD = spark.read.option("header","true").csv("s3://bucket/path/") 
2. val osRDD = spark.read.format("com.databricks.spark.csv").option("header", "true").load("s3://bucket/path/") 

所有两个以上S3的变化,S3A和S3N前缀正常工作从REPL但JAR里面却返回: org.apache.hadoop.mapred.InvalidInputException:输入路径不存在:s3://bucket/path/eventsByOS.csv 因此,它发现该文件但无法读取它。

认为这是一个权限问题,我曾尝试:

a. export AWS_ACCESS_KEY_ID=<access key> and export AWS_SECRET_ACCESS_KEY=<secret> from the Linux prompt. With Spark 2 this has been sufficient to provide us access to the S3 folders up until now. 
b. .config("fs.s3.access.key", <access>) 
.config("fs.s3.secret.key", <secret>) 
.config("fs.s3n.access.key", <access>) 
.config("fs.s3n.secret.key", <secret>) 
.config("fs.s3a.access.key", <access>) 
.config("fs.s3a.secret.key", <secret>) 

此故障之前,代码位于同一个桶拼花文件读取和写入文件拼花到同一桶。 CSV文件的大小仅为4.8 KB。

任何想法,为什么这是失败?

谢谢!

添加堆栈跟踪:

org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:253) 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201) 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281) 
org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) 
scala.Option.getOrElse(Option.scala:121) 
org.apache.spark.rdd.RDD.partitions(RDD.scala:250) 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) 
scala.Option.getOrElse(Option.scala:121) 
org.apache.spark.rdd.RDD.partitions(RDD.scala:250) 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) 
scala.Option.getOrElse(Option.scala:121) 
org.apache.spark.rdd.RDD.partitions(RDD.scala:250) 
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1332) 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
org.apache.spark.rdd.RDD.take(RDD.scala:1326) 
org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367) 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
org.apache.spark.rdd.RDD.first(RDD.scala:1366) 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.findFirstLine(CSVFileFormat.scala:206) 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:60) 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184) 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184) 
scala.Option.orElse(Option.scala:289) 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:183) 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387) 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) 
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415) 
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352) 
+0

什么是完整的堆栈跟踪? –

+0

感谢提醒@SteveLoughran :-) –

回答

0

没有涌出来时,我粘贴进栈的IDE,但我看的Hadoop的更高版本,不能切换当前对旧的。

  1. 看一看these instructions
  2. 那陆地卫星GZ文件实际上是一个CSV文件,你可以尝试读取;这是我们通常用于测试的那个,因为它在那里并且可以免费使用。首先看看你是否可以使用它。
  3. 如果使用spark 2.0,请使用spark自带的CSV包。
  4. 请使用S3a,而不是其他的。
+0

谢谢史蒂夫。这是一套很棒的指示。虽然他们没有帮助读取CSV,但他们确实帮助我更快地写入文件。 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2本身就是一个巨大的改进。 –

+0

是的,改名为杀手。有一些正在进行的工作来解决这个问题,[HADOOP-13786](https://issues.apache.org/jira/browse/HADOOP-13786);尚未准备好使用,但欢迎您在接近发布时帮助进行测试。 –

0

我通过为相应的方法添加特定的Hadoop配置(此处为示例中的s3)来解决此问题。奇怪的是,上面的安全措施适用于Spark 2.0中除了读取CSV以外的所有内容。

此代码使用S3解决了我的问题。

spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", p.aws_accessKey) 
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey",p.aws_secretKey)