2016-12-16 57 views
1

让我们假设我们有一个这样的数字列表:Spark:找到缺失号码的程序

lst = [1,2,4,5,9,10] 

我该如何编写Spark程序来找出该列表中缺失的数字。该计划应返回:3,6,7,8。

我试着用蓄电池,力气锻炼出来。

+0

可以分享您的解决方案,这是不工作,你到目前为止尝试过。 –

+0

对于在计算每一行时依赖于查看其他行的问题,Spark不是最佳选择。当你可以处理每个项目而不依赖其他项目时,Spark是最好的,所以它可以高效地并行化。 –

+0

@丹尼尔,我也很清楚,但我被要求在采访中实施。我告诉他们带有蓄电池的解决方案,他们接受了。后来我尝试在家中实现它,但它并不奏效,因为累加器只能用于通过更新操作关联的不同任务更新值,而不能访问该值。 – Mrinal

回答

0

如果您不太担心拥有最佳解决方案,一种方法是首先广播您拥有的数据,然后并行化包含所有元素的集合并根据广播的数据进行过滤。

喜欢的东西

lst = [1,2,4,5,9,10] 
broadcastVar = sc.broadcast(lst) 

all_elems = sc.parallelize([i+1 for i in range(10)]) 
all_elems.filter(lambda x: x not in broadcastVar.value) 

如果你正在寻找的东西,只是少量数据的工作,那么这是罚款。如果你有很多数据,那么这种方法是不好的,不应该使用。

如果需要一个更好的解决方案,然后我会做以下

  1. 本质上对数据进行分区,使用RDDS你可以做一个映射输出(分区之前,数)。您可以编写一个小函数来获取每个数字的分区号。因此,举例来说,如果您在此地图之后有2个执行者,您将拥有[(1-5,1),(1-5,2),(1-5,4),(1-5,5),( 6-10,9),(6-10,10)]
  2. 按键分组,现在我们有[(1-5,[1,2,4,5]),(6-10,[9 ,10])]
  3. 映射您遍历key指定的范围,与值中的元素进行比较并返回不存在的元素列表。

然后,您可以编写结果或收集或任何你想要做的事情。有一点需要注意的是,例如,如果我使用了5个执行者,那么这些密钥会是1-2,3-4,5-6,7-8,9-10,关键字7-8不会,没有任何元素。为了避免这种情况,可以将组之前的rdd按键与[(1-2,-1),(3-4,-1),(5-6,-1),(7-8, -1),(9-10,-1)]。如果你有很多数据,那么与整个工作相比,这个数据所增加的开销是非常小的。

这个样本代码有很多错误,但将其视为概念验证。

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.SparkSession; 
import org.spark_project.guava.collect.Lists; 

import scala.Tuple2; 

public class Main { 

public static void main(String[] args) { 

    SparkSession spark = SparkSession.builder().appName("spark-missing-nr").master("local[*]").getOrCreate(); 
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); 
    Integer[] lst = new Integer[] { 1, 2, 4, 5, 9, 10 }; 
    JavaRDD<Integer> lstRDD = sc.parallelize(Arrays.asList(lst)); 

    // Partition the data by whether number is smaller/equal or larger than 
    // 5 
    JavaPairRDD<String, Integer> groupableRDD = lstRDD.mapToPair(i -> { 
     String group = i <= 5 ? "1-5" : "6-10"; 
     return new Tuple2<String, Integer>(group, i); 
    }); 
    // Group by key 
    JavaPairRDD<String, Iterable<Integer>> groupedRDD = groupableRDD.groupByKey(); 

    // so now we have [(1-5,[1, 2, 4, 5]), (6-10,[9, 10])] 
    System.out.println(groupedRDD.collect()); 

    // map where you iterate over range specified by key 
    JavaRDD<List<Integer>> missingValuesLists = groupedRDD.map(t -> { 
     Integer from = new Integer(t._1().split("-")[0]); 
     Integer to = new Integer(t._1().split("-")[1]); 

     List<Integer> valuesList = Lists.newArrayList(t._2()); 
     List<Integer> missingValues = new ArrayList<Integer>(); 

     // iterate over range specified by key 
     for (int i = from; i < to + 1; i++) { 
      if (!valuesList.contains(i)) { 
       missingValues.add(i); 
      } 
     } 
     return missingValues; 
    }); 
    // outputs [[3], [6, 7, 8]] 
    System.out.println(missingValuesLists.collect()); 
    sc.close(); 
} 
} 
+0

恐怕我必须用一个有大约十亿个数字的列表来做。 – Mrinal

+0

嗨Ossu54,你可以请提供代码示例,如果可能的话? – Mrinal

+0

我在Java中添加了一些样例代码,希望没关系。 – oh54

0

你可以尝试用全系列创建RDD,使用sc.range,然后使用subtract功能:

lst = sc.parallelize([1,2,4,5,9,10]) 
max_value = lst.max() 
full_data = sc.range(1, max_value) 
missing_values = full_data.subtract(lst) 

你能避免调用max(),如果你知道的完整列表的事先大小。

+0

@Mrinal你尝试过这种方法吗? –

+0

对不起丹尼尔回复(我一直很忙)。我尝试了它,但它工作正常,但如果我们正在处理数十亿个数字,它就不能被认为是最佳解决方案,我们不能再有一个庞大的数据清单来处理。无论如何感谢解决方案,我喜欢减法部分:)这是最简单的方法。 – Mrinal