1
我是新来弗林克 创建文件(CSV或文本)我有改造想这样弗林克流上的时间窗口
val supportTask= customSource
.map(line => line.split(","))
.map(line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toString,line(5)toInt,line(6)toInt))
.filter(_ => true) //todo put sent date condition
.map(line => Count(1))
.keyBy(0)
.timeWindow(Time.seconds(20)) //todo for time being 10 seconds, actuals 30 min
.sum(0)
现在我想20秒钟的间隔时间窗口
supportTask.writeAsText(("D://myfile_"+Calendar.getInstance().get(Calendar.SECOND)),WriteMode.NO_OVERWRITE).setParallelism(1)
创建文件
我已经提供了文件名+秒,以便每次创建文件时附加秒。
但是这里只有一个文件被创建,我想创建每20秒窗口的新文件我该怎么做?
使用** DataStream.writeUsingOutputFormat()API **。 ** writeAsText **将所有输出记录写入参数中指定的文件。你必须实现一个特殊的输出格式来实现这一点。 – David