2017-04-24 66 views
2

样品JSON JavaRDD .foreach( 共100条记录): 数组列表为空后,火花

{ “名”: “开发”, “工资”:10000, “职业”:“ENGG ”, “地址”: “诺伊达”} { “名”: “KARTHIK”, “工资”:20000, “职业”: “ENGG”, “地址”: “诺伊达”}

有用的代码:

final List<Map<String,String>> jsonData = new ArrayList<>(); 

    DataFrame df = sqlContext.read().json("file:///home/dev/data-json/emp.json"); 
    JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD(); 

    rdd.foreach(new VoidFunction<String>() { 
     @Override 
     public void call(String line) { 
      try { 
       jsonData.add (new ObjectMapper().readValue(line, Map.class)); 
       System.out.println(Thread.currentThread().getName()); 
       System.out.println("List size: "+jsonData.size()); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    System.out.println(Thread.currentThread().getName()); 
    System.out.println("List size: "+jsonData.size()); 

jsonData最后是空的。

输出:

Executor task launch worker-1 
List size: 1 
Executor task launch worker-1 
List size: 2 
Executor task launch worker-1 
List size: 3 
. 
. 
. 
Executor task launch worker-1 
List size: 100 

main 
List size: 0 
+1

由于列表在开始时似乎是空的,它可能是对象映射器无法解析它得到的行吗?你能提供一个[mcve]吗? – Thomas

+1

什么是'rdd'? – khelwood

+2

也许'System.out.println'在foreach完成任务之前执行(或者甚至开始)? – freedev

回答

1

我已经测试过这一点也适用 https://github.com/freedev/spark-test

final ObjectMapper objectMapper = new ObjectMapper(); 

List<Map<String, Object>> list = rdd 
     .map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() { 
      @Override 
      public Map<String, Object> call(String line) throws Exception { 
       TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() { 
       }; 
       Map<String, Object> rs = objectMapper.readValue(line, typeRef); 
       return rs; 
      } 
     }).collect(); 

我首选映射Map<String, Object>,因为这会在你的Json办案不到哪值部分不是字符串(即"salary":20000)。

+1

这个问题被标记为'java-7'。 Java 8代码不太可能有用。 – khelwood

+0

@khelwood谢谢 – freedev

+0

@freedev感谢您的努力。我尝试了它,但得到'异常在线程“主要”org.apache.spark.SparkException:不可序列化的任务 \t at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)'即使添加'将Serializable'实现到我正在运行的主类中主要方法 –