2017-02-02 58 views
5

从火花结构流文件: “此检查点的位置必须是在HDFS兼容的文件系统的路径,并且可以开始一个时被设置为在DataStreamWriter一个选项查询“。阿帕奇火花(结构化数据流):S3检查点支持

果然,检查点设置到S3路径抛出:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
     at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
     at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

一对夫妇的问题在这里:

  1. 为什么S3不支持作为一个检查点目录(正常的火花流支持这个)?什么使文件系统“符合HDFS”?
  2. 我强烈使用HDFS(因为集群可以一直上下),并使用s3作为保存所有数据的地方 - 在这样的设置中存储用于结构化流数据的检查点数据的建议是什么?
+0

纯粹的猜测在这里,但你有没有尝试过s3n或s3a(最好是s3a)协议? – ImDarrenG

+0

绝对值得和尝试,会试试看。 – Apurva

回答

2

什么使FS HDFS“符合?”它是一个文件系统,具有在Hadoop FS specification中指定的行为。对象存储和FS之间的差被覆盖在那里,与关键点是“最终一致的对象存储,而不追加或O(1)原子重命名不符合”

对于S3特别

  1. 这是不一致:在创建新的blob后,list命令通常不会显示它。相同的删除。
  2. 当斑被覆盖或删除,它可能需要一段时间才能消失
  3. 重命名()是通过复制来实现的,然后通过保存一切的位置,然后重命名删除

星火流关卡它到检查点目录。这使得检查点的时间与在S3中执行数据拷贝的时间成比例,该时间为〜6-10MB/s。

当前的流媒体代码不适合s3,虽然在某些时候我可能会解决这个问题,但是我没有提交任何新的补丁,直到他们提交我的旧补丁。如果它只是被忽略,那么我对这些东西的工作就毫无意义。

现在,做

  • 检查点HDFS的一个,然后拷贝过来的结果
  • 检查点位EBS的分配和连接到群集
  • 检查站S3,但有检查点之间的长时间差距,以便检查点的时间不会导致流式应用程序停机。

如果您正在使用EMR,您可以支付一个一致的,发电数据库支持的S3的保费,这可以提供更好的一致性。但复制时间仍然相同,因此检查点设置会一样慢

+0

我们在检查点到S3之间有40秒的时间间隔,而且我们偶尔还会检查点问题,例如正在写入临时目录,然后找不到。 –

+0

未找到检查点可能是s3的一致性曲面:清单往往滞后于对象库中的更改。通常你不会注意到,但有时会出现。为元数据存储使用发电机应该可行......至少如果不行的话,它一直在错误地实施 –

4

这是一个已知的问题:https://issues.apache.org/jira/browse/SPARK-19407

应固定在下一版本中。作为解决方法,您可以使用--conf spark.hadoop.fs.defaultFS=s3将默认文件系统设置为s3。

+0

不要以为这已经解决了。仍然无法在S3上检查点结构化流(spark 2.1.1)。 检查点恢复失败: 7/06/29 00:29:00信息StateStoreCoordinatorRef:已注册StateStoreCoordinator端点 org.apache.spark.sql.AnalysisException:此查询不支持从检查点位置恢复。 – Apurva

+0

这是一个不同的问题。你使用不支持恢复的“内存”或“控制台”吗? – zsxwing