2014-10-06 185 views
12

我只想保留在第二个表中引用了部门ID的员工。基于Spark中的另一个RDD进行过滤

Employee table 
LastName DepartmentID 
Rafferty 31 
Jones 33 
Heisenberg 33 
Robinson 34 
Smith 34 

Department table 
DepartmentID 
31 
33 

我曾尝试下面的代码不工作:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
employee.filter(lambda e: e[1] in department).collect() 

Py4JError: An error occurred while calling o344.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

任何想法?我在Python中使用Spark 1.1.0。但是,我会接受一个Scala或Python的答案。

+0

你需要你的部门名单是一个“中” RDD? – maasg 2014-10-06 18:01:43

+0

不是。部门列表从HDFS加载,但不是很大。 – poiuytrez 2014-10-07 07:52:07

回答

19

在这种情况下,你想实现的是在与包含在部门表中的数据的每个分区过滤: 这将是基本的解决方案:

val dept = deptRdd.collect.toSet 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

如果你的部门的数据是大的,广播的变量将会被一次传送数据到所有节点,而不必与每个任务

val deptBC = sc.broadcast(deptRdd.collect.toSet) 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 
0序列化提高性能

虽然使用连接可以工作,但这是一个非常昂贵的解决方案,因为它需要分布式数据的洗牌(byKey)来实现连接。鉴于该要求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

+0

赦免我如果我在这里错了,但不会partitionBy()通过键解决分布式洗牌?并不是说它会解决加入问题更昂贵的问题,因为我不这么认为,我只是指出加入并不需要100%的时间进行洗牌。 – TurnipEntropy 2018-02-19 02:42:08

10

我终于实现了一个使用连接的解决方案。我有一个0值添加到部门,以避免星火异常:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
# invert id and name to get id as the key 
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) 
# add a 0 value to avoid an exception 
department = sc.parallelize(department).map(lambda d: (d,0)) 

employee.join(department).map(lambda e: (e[1][0], e[0])).collect() 

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 
0

过滤多列多个值:

在这种情况下,你是从数据库中提取数据(蜂巢或者在这个例子中SQL类型的数据库),并需要在多个列的过滤,则可能是更容易装载表与第一过滤器,然后通过RDD迭代您的过滤器(多个小迭代是星火编程的鼓励方式):

{ 
    import org.apache.spark.sql.hive.HiveContext 
    val hc = new HiveContext(sc) 

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") 
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") 
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") 

} 

当然,你必须知道你的数据一点点地过滤对正确的值,但这是分析过程的一部分。

0

对于上面的同一个exm,我只想保留包含或在第二个表中引用的部门ID的雇员。 但它必须没有连接操作,我会看到它在“包含”或“”, 我的意思是33的334和335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
相关问题