2017-04-05 547 views
-1

我是Flink的新手。其实我试图通过flink java api读取文件和csv转换。如何使用flink java api读取目录下的文件名(本地文件系统/ hdfs)

根据我们的要求。 一)需要通过文件夹作为输入参数,输出参数为CSV文件名 二)需要读取从本地文件系统/ HDFS)文件 C写的同一数据为CSV

我的代码:

public class WriteToCSV { 

    public static void main(String[] args) throws Exception { 
     final ParameterTool params = ParameterTool.fromArgs(args); 
     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
     env.getConfig().setGlobalJobParameters(params); 
     List<String> paths = new ArrayList<String>(); 
     File dir = new File("C://"); 
     for (File f : dir.listFiles()) { 
       paths.add(f.getName()); 
     } 
     DataSet<String> data = env.fromCollection(paths).rebalance(); 

     DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1) 
        data.flatMap(new MySplitter()).groupBy(0).sum(1); 

     System.out.println(" data -:"+data); 
     data.print(); 
     counts.writeAsCsv("C://new.csv"); 
    } 
} 


class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 
     // normalize and split the line into words 
     String[] tokens = value.split("\\W+"); 

     // emit the pairs 
     for (String token : tokens) { 
      if (token.length() > 0) { 
       out.collect(new Tuple2<String, Integer>(token, 1)); 
      } 
     } 
    } 
} 

我能够得到文件名(data.print())。但csv不创建,在服务器控制台中也不例外。

回答

1

为什么没有写在代码的CSV的原因是,你不叫env.execute()counts.writeAsCsv("C://new.csv");

后,为了进一步您可以使用env.readTextFile(path)它接受一个目录路径和读取所有文件改进代码该目录为每一行生成记录。

+0

DataSet和DataStream程序的print()行为不同。调用print()时,DataSet程序触发执行,并将结果写入提交程序的客户端的stdout。 DataStream程序不启动程序(这需要'execute()')并打印到工作的stdout。 –

+0

是的,我知道,但在提供的例子中'print()'在'writeAsCsv'之前被调用,所以我相信它打印得很好,但输出不会写入csv。 –

+0

啊,是的。对不起,我没有看到你回答正确。谢谢! –

相关问题