2016-11-09 57 views
0

文件名是否应包含tetFileStream的数字?只有当文件名包含一个数字时,我的程序才会提取新文件。忽略所有其他文件,即使它们是新的。是否有任何设置需要更改以提取所有文件?请帮助S3上的spark textFileStream

+0

不,它取得特定目录中的所有文件。你可以粘贴一些代码以获得更好的帮助! –

+0

谢谢Srinivas。跑到另一个问题。当我在我的VM中本地提交它时,我的Spark流代码工作正常。它从S3读取文件并将输出写入ELK。但是,当我将jar提交到EMR集群时,它会发出警告,提示“读取文件时出错”,并且根本不读取。这是一种权限问题吗?我为我的S3存储桶提供了所有必需的密钥。任何意见? – Vamsi

回答

0

它会扫描目录中显示窗口内出现的新文件。如果你正在写S3,直接写你的代码,因为直到最后的close() - 不需要重命名为止,文件才会出现。相反,如果您正在使用针对正常文件系统的文件流源,则应该从扫描的目录中创建并在最后重命名 - 否则可能会读取正在工作的文件。一旦阅读:从不重读。

+0

谢谢史蒂夫。跑到另一个问题。当我在我的VM中本地提交它时,我的Spark流代码工作正常。它从S3读取文件并将输出写入ELK。但是,当我将jar提交到EMR集群时,它会发出警告,提示“读取文件时出错”,并且根本不读取。这是一种权限问题吗?我为我的S3存储桶提供了所有必需的密钥。任何意见? – Vamsi

+0

就是这么说的?没有堆栈跟踪? –

+0

当我在YARN模式下使用EMR控制台提交时,我没有太多的信息。它说运行,但没有读取S3中的任何内容。如果我在EMR Master Instance终端上手动提交,以下是堆栈跟踪。 FileInputDStream:查找新文件时出错 java.lang.NullPointerException \t at scala.collection.mutable.ArrayOps $ ofRef $ .length $ extension(ArrayOps.scala:192)等等等等 – Vamsi

0

花了几个小时来分析堆栈跟踪,我发现问题是S3地址。我提供了“s3:// mybucket”,这是为Spark 1.6和Scala 2.10.5工作的。在Spark 2.0(和Scala 2.11)上,它必须提供为“s3:// mybucket /”。可能是一些正则表达式相关的东西。现在工作正常。感谢所有的帮助。

+0

哦,所以你使用了一个短路径的“s3://桶”没有路径?这可能被解释为对主目录的引用,因此将被限定为bucket/users/$ USER的路径,其中$ USER = you。这发生在命令行上,是一个真正的痛苦。我个人喜欢解决这个问题(https://issues.apache.org/jira/browse/HADOOP-13648),但是担心什么坏处 –