2016-09-07 104 views
0

我正面临一个问题,我必须找出最大的行及其索引。这是我的方法使用Spark java查找最大行数

SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
    JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

     @Override 
     public Tuple2<Integer,String> call(String v1) throws Exception { 
      // TODO Auto-generated method stub 
      return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
     } 
    }); 
    JavaPairRDD<Integer, String> linNoToWord = JavaPairRDD.fromJavaRDD(words).sortByKey(false); 

    System.out.println(linNoToWord.first()._1+" ********************* "+linNoToWord.first()._2); 
+2

请描述你的具体问题。你的方法失败了吗? – YakovL

+0

问题是在文件中使用spark和line一起查找最大行的索引。 –

+0

@RaviShankar下面的答案会给你从0开始的行索引。 –

回答

0

这样的tupleRDD将获得关键的基础和第一个元素进行排序在排序后的新rdd是最长的:

JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

    @Override 
    public Tuple2<Integer,String> call(String v1) throws Exception { 
     // TODO Auto-generated method stub 
     return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
    } 
}); 
JavaRDD<Tuple2<Integer,String>> tupleRDD1= tupleRDD.sortBy(new Function<Tuple2<Integer,String>, Integer>() { 

     @Override 
     public Integer call(Tuple2<Integer, String> v1) throws Exception { 
      // TODO Auto-generated method stub 
      return v1._1; 
     } 
    }, false, 1); 
    System.out.println(tupleRDD1.first()); 
} 
0

既然你关心的行号和文字都请试试这个。

首先创建一个序列化类线:

public static class Line implements Serializable { 
    public Line(Long lineNo, String text) { 
     lineNo_ = lineNo; 
     text_ = text; 
    } 
    public Long lineNo_; 
    public String text_; 
} 

然后执行以下操作:

SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/words.txt"); 
    JavaPairRDD<Long, Line> linNoToWord2 = rdd.zipWithIndex().mapToPair(new PairFunction<Tuple2<String,Long>, Long, Line>() { 
     public Tuple2<Long, Line> call(Tuple2<String, Long> t){ 

      return new Tuple2<Long, Line>(Long.valueOf(t._1.split(" ").length), new Line(t._2, t._1)); 
     } 
    }).sortByKey(false); 

    System.out.println(linNoToWord2.first()._1+" ********************* "+linNoToWord2.first()._2.text_);