2015-11-02 134 views
1

我在Java中编写了一个Flink流作业,它加载包含订户数据(4列)的csv文件,然后在与订户数据匹配时从套接字流中读取数据。加载大文件时Flink作业在提交时挂起

起初我是用一个小的csv文件(8 MB)和一切工作正常:

# flink run analytics-flink.jar 19001 /root/minisubs.csv /root/output.csv 
loaded 200000 subscribers from csv file 
11/02/2015 16:36:59 Job execution switched to status RUNNING. 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING 

我切换csv文件到一个更大的(〜45 MB),现在我看到的是这样的:

# flink run analytics-flink.jar 19001 /root/subs.csv /root/output.csv 
loaded 1173547 subscribers from csv file 

请注意,上面的订阅者数量是文件中的行数。我试图在Flink配置中查找任何超时,但我找不到任何超时。

任何帮助,非常感谢!

编辑:CSV是加载利用公共-CSV 1.2库使用这种方法:

private static HashMap<String, String> loadSubscriberGroups(
      String referenceDataFile) throws IOException { 
     HashMap<String,String> subscriberGroups = new HashMap<String, String>(); 

     File csvData = new File(referenceDataFile); 
     CSVParser parser = CSVParser.parse(csvData, Charset.defaultCharset(), CSVFormat.EXCEL); 
     for (CSVRecord csvRecord : parser) { 
      String imsi = csvRecord.get(0); 
      String groupStr = csvRecord.get(3); 

      if(groupStr == null || groupStr.isEmpty()) { 
       continue; 
      } 
      subscriberGroups.put(imsi, groupStr); 
     } 

     return subscriberGroups; 
    } 

和这里的文件(我知道有在最后一个逗号的样本,最后一栏是空的现在):

450000000000001,450000000001,7752,Tier-2, 
450000000000002,450000000002,1112,Tier-1, 
450000000000003,450000000003,6058,Tier-2, 
+0

加载CSV文件究竟该怎么做?你能否提供一个读取CSV文件的程序片段? –

+0

编辑后添加csv加载方法 –

+0

感谢您的更新。使用CSV数据做什么?你如何将它注入Flink程序? –

回答

4

罗伯特Meztger(阿帕奇弗林克开发商):

我可以解释为什么你的第一种方法没有活像k:

您试图使用我们的RPC系统(Akka)将来自Flink客户端的CSV文件发送到 群集。当您向Flink提交作业时,我们将用户创建的所有对象(映射器,来源,...) 序列化并将其发送到群集。有一种方法 StreamExecutionEnvironment.fromElements(..),它允许用户连同作业提交序列化几个对象。但是您可以像这样传输的数据量 受Akka帧大小的限制。 在我们的案例中,我认为默认值是10兆字节。之后,Akka 可能只是放弃或拒绝部署消息。

解决方法是使用富运算符而不是常规运算符(例如RichMapFunction而不是MapFunction),覆盖open()方法并在该方法内加载CSV文件。

谢谢Robert!