-1
我是Flink的新手。其实我试图通过flink java api读取文件和csv转换。如何使用flink java api读取目录下的文件名(本地文件系统/ hdfs)
根据我们的要求。 一)需要通过文件夹作为输入参数,输出参数为CSV文件名 二)需要读取从本地文件系统/ HDFS)文件 C写的同一数据为CSV
我的代码:
public class WriteToCSV {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
List<String> paths = new ArrayList<String>();
File dir = new File("C://");
for (File f : dir.listFiles()) {
paths.add(f.getName());
}
DataSet<String> data = env.fromCollection(paths).rebalance();
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
data.flatMap(new MySplitter()).groupBy(0).sum(1);
System.out.println(" data -:"+data);
data.print();
counts.writeAsCsv("C://new.csv");
}
}
class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line into words
String[] tokens = value.split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
我能够得到文件名(data.print())。但csv不创建,在服务器控制台中也不例外。
DataSet和DataStream程序的print()行为不同。调用print()时,DataSet程序触发执行,并将结果写入提交程序的客户端的stdout。 DataStream程序不启动程序(这需要'execute()')并打印到工作的stdout。 –
是的,我知道,但在提供的例子中'print()'在'writeAsCsv'之前被调用,所以我相信它打印得很好,但输出不会写入csv。 –
啊,是的。对不起,我没有看到你回答正确。谢谢! –