2016-06-10 153 views
0

我的代码使用readTextFile读取日志文件,当我在Flink(/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar)中运行该jar时,它成功处理了里面的行并停止了我的应用程序,而不是等待新行。 我需要一些参数来做它吗?为什么flink停止我的流应用程序?

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("hdfs:///test/ignicion.io") 

预先感谢您

回答

2

您正在寻找

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

对于WatchType您有下列选项

  • ONLY_NEW_FILES,
  • REPROCESS_WITH_APPENDED,
  • PROCESS_ONLY_APPENDED;

流从

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName) 

会阅读完所有的文件后完成。我认为,它主要是在开发过程中进行本地测试。

+0

同样的结果:'env.readFileStream(“hdfs:///test/ignicion.io”,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)' – jag

+0

你是不是说它也会立即停止? – snntrable

+0

对不起,它的工作原理与你所写的一样... – jag

相关问题