0

在我的场景中,我有不断上传到HDFS的CSV文件。一旦他们的写作最终完成,如何在HDFS目录中处理新文件?

只要新文件上传,我想用Spark SQL处理新文件(例如,计算文件中字段的最大值,将文件转换为parquet)。即我在每个输入文件和变换/处理的输出文件之间具有一对一映射。

我正在评估Spark Streaming来侦听HDFS目录,然后用Spark来处理“流式文件”。

但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间的端到端一对一映射。

如何转换整个文件而不是其微批?

据我所知,Spark Streaming只能将变换应用到批次(DStreams映射到RDDs),而不是立即(当其有限的流完成时)整个文件。

这是正确的吗?如果是这样,我应该考虑什么替代方案?

+0

一个文件在被Spark Streaming拾取之前被完全写入HDFS,所以我不明白问题 –

+0

@ cricket_007您能澄清你的意思吗? –

回答

1

我可能误解了你的问题的第一次尝试......

据我所知,星火流只能申请改造批次(DStreams映射到RDD),而不是一次写入整个文件(当其有限的流完成时)。

这是正确的吗?

不,这是不是正确。

Spark Streaming将在Spark Streaming的批处理间隔时间内写入HDFS的同时对整个文件应用转换。

Spark Streaming将采用文件的当前内容并开始处理它。


只要一个新文件被上传我需要处理与星火新的文件/ SparkSQL

几乎不可能星火由于它的架构,它需要一些时间从时刻“上传”并且Spark处理它。

您应该考虑使用全新且有光泽的Structured Streaming或(即将过时)Spark Streaming

这两种解决方案都支持在新文件上传(这正是您的使用案例)时观看新文件的目录并触发Spark作业。

引用结构化数据流的Input Sources

在星火2.0,有几个内置信号源。

  • 文件源 - 将作为数据流读取到目录中的文件读取到目录中。支持的文件格式是文本,csv,json,parquet。查看DataStreamReader接口的文档以获取更新的列表,以及每种文件格式的支持选项。请注意,文件必须原子地放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作来实现。

又见星火流的Basic Sources

此外套接字,的StreamingContext API提供了从文件作为输入源创建DStreams方法。

文件流:为了从文件读取与HDFS API兼容(即,HDFS,S3,NFS等)的任何文件系统上的数据,一个DSTREAM可被创建为:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) 

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件(以不支持的嵌套目录编​​写的文件)。

有一点需要注意,虽然给出您的要求:

我需要知道的“文件流”完成时。

不要用Spark做到这一点。

引用星火流的Basic Sources再次:

  • 文件必须在DataDirectory目录由原子移动或重命名它们在数据目录中创建。

  • 移动后,文件一定不能更改。所以如果文件被连续追加,新的数据将不会被读取。

结束语...你应该只文件移动到文件完成并准备用放电处理时星火手表的目录。这超出了Spark的范围。

+0

谢谢你的回答,顺便说一句,我需要说出我的问题的关键点。我怎样才能转换整个文件而不是它的microbatches?这就是为什么我写了[quote]我需要知道什么时候“文件流”完成。我需要将转换应用于整个文件,以保留文件之间的端到端一对一映射。 –

+0

@Andrea你需要澄清什么决定了整个文件。 HDFS不识别“文件流”。写入它的任何文件的每个“部分”都将被识别为*整个文件*。 –

0

您可以使用DFSInotifyEventInputStream观察Hadoop目录,然后在创建文件时以编程方式执行Spark作业。

看到这个职位: HDFS file watcher

+0

Spark Streaming可以观看一个文件夹。该类不是必需的 –

+0

如何在Spark Streaming中按文件处理文件?如果两个文件一次写入会怎么样? –

+0

你是什么意思文件的文件?如文档中所述(在其他答案中复制),Spark Streaming将拾取*原子移动到目标目录*的所有文件,因此两个文件被视为两个单独的记录 –

相关问题