2017-06-18 121 views
0

后创建的文件有什么办法来配置textFileStream源,它将处理任何文件添加到源目录,无论文件的创建时间呢?星火流仅流流初始化时间

为了说明这个问题,我创建了一个使用textFileStream作为源并打印流内容到控制台基本星火流应用。当在运行应用程序之前创建的现有文件被复制到源目录中时,没有任何内容被打印到控制台。当应用程序开始运行后创建的文件被复制到源目录时,将打印文件内容。以下是我的代码供参考。

val conf = new SparkConf().setAppName("Streaming Test") 
          .setMaster("local[*]") 

val spark = new SparkContext(conf) 
val ssc = new StreamingContext(spark, Seconds(5)) 

val fileStream = ssc.textFileStream("/stream-source") 

val streamContents = fileStream.flatMap(_.split(" ")) 

streamContents.print() 

回答

1

这是FileInputDStream的记录行为。

如果我们想消耗在该目录中已有的文件,我们可以使用Spark API来加载这些文件,并应用我们所需的逻辑给他们。

val existingFiles = sparkContext.textFile(path) 

val existingFilesDS = sparkSession.read.text(path) 

然后后,设置和开始流的逻辑。 我们甚至可以使用已存在的文件的数据在换新的处理。

+0

在源代码中的文档是有些模棱两可 - '在这种情况下,“新”是指当时period'中变得可见,以飨读者文件。这使得看起来好像该文件不需要在时间段开始后创建,而是仅在该时间段开始后才被提供给源。 –