让我们假设我们有一个这样的数字列表:Spark:找到缺失号码的程序
lst = [1,2,4,5,9,10]
我该如何编写Spark程序来找出该列表中缺失的数字。该计划应返回:3,6,7,8。
我试着用蓄电池,力气锻炼出来。
让我们假设我们有一个这样的数字列表:Spark:找到缺失号码的程序
lst = [1,2,4,5,9,10]
我该如何编写Spark程序来找出该列表中缺失的数字。该计划应返回:3,6,7,8。
我试着用蓄电池,力气锻炼出来。
如果您不太担心拥有最佳解决方案,一种方法是首先广播您拥有的数据,然后并行化包含所有元素的集合并根据广播的数据进行过滤。
喜欢的东西
lst = [1,2,4,5,9,10]
broadcastVar = sc.broadcast(lst)
all_elems = sc.parallelize([i+1 for i in range(10)])
all_elems.filter(lambda x: x not in broadcastVar.value)
如果你正在寻找的东西,只是少量数据的工作,那么这是罚款。如果你有很多数据,那么这种方法是不好的,不应该使用。
如果需要一个更好的解决方案,然后我会做以下
然后,您可以编写结果或收集或任何你想要做的事情。有一点需要注意的是,例如,如果我使用了5个执行者,那么这些密钥会是1-2,3-4,5-6,7-8,9-10,关键字7-8不会,没有任何元素。为了避免这种情况,可以将组之前的rdd按键与[(1-2,-1),(3-4,-1),(5-6,-1),(7-8, -1),(9-10,-1)]。如果你有很多数据,那么与整个工作相比,这个数据所增加的开销是非常小的。
这个样本代码有很多错误,但将其视为概念验证。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.spark_project.guava.collect.Lists;
import scala.Tuple2;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("spark-missing-nr").master("local[*]").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Integer[] lst = new Integer[] { 1, 2, 4, 5, 9, 10 };
JavaRDD<Integer> lstRDD = sc.parallelize(Arrays.asList(lst));
// Partition the data by whether number is smaller/equal or larger than
// 5
JavaPairRDD<String, Integer> groupableRDD = lstRDD.mapToPair(i -> {
String group = i <= 5 ? "1-5" : "6-10";
return new Tuple2<String, Integer>(group, i);
});
// Group by key
JavaPairRDD<String, Iterable<Integer>> groupedRDD = groupableRDD.groupByKey();
// so now we have [(1-5,[1, 2, 4, 5]), (6-10,[9, 10])]
System.out.println(groupedRDD.collect());
// map where you iterate over range specified by key
JavaRDD<List<Integer>> missingValuesLists = groupedRDD.map(t -> {
Integer from = new Integer(t._1().split("-")[0]);
Integer to = new Integer(t._1().split("-")[1]);
List<Integer> valuesList = Lists.newArrayList(t._2());
List<Integer> missingValues = new ArrayList<Integer>();
// iterate over range specified by key
for (int i = from; i < to + 1; i++) {
if (!valuesList.contains(i)) {
missingValues.add(i);
}
}
return missingValues;
});
// outputs [[3], [6, 7, 8]]
System.out.println(missingValuesLists.collect());
sc.close();
}
}
你可以尝试用全系列创建RDD,使用sc.range
,然后使用subtract
功能:
lst = sc.parallelize([1,2,4,5,9,10])
max_value = lst.max()
full_data = sc.range(1, max_value)
missing_values = full_data.subtract(lst)
你能避免调用max()
,如果你知道的完整列表的事先大小。
@Mrinal你尝试过这种方法吗? –
对不起丹尼尔回复(我一直很忙)。我尝试了它,但它工作正常,但如果我们正在处理数十亿个数字,它就不能被认为是最佳解决方案,我们不能再有一个庞大的数据清单来处理。无论如何感谢解决方案,我喜欢减法部分:)这是最简单的方法。 – Mrinal
可以分享您的解决方案,这是不工作,你到目前为止尝试过。 –
对于在计算每一行时依赖于查看其他行的问题,Spark不是最佳选择。当你可以处理每个项目而不依赖其他项目时,Spark是最好的,所以它可以高效地并行化。 –
@丹尼尔,我也很清楚,但我被要求在采访中实施。我告诉他们带有蓄电池的解决方案,他们接受了。后来我尝试在家中实现它,但它并不奏效,因为累加器只能用于通过更新操作关联的不同任务更新值,而不能访问该值。 – Mrinal