2016-11-25 74 views
2

我有两个大火花DataFrames,都包含坐标。让我们把他们的位置和地点:评估来自两个数据帧的行的所有组合

loc = [('01', 0.2, 0.9), ('02', 0.3, 0.6), ('03', 0.8, 0.1)] 
locations = sqlContext.createDataFrame(loc, schema=['id', 'X', 'Y']) 

site = [('A', 0.7, 0.1), ('B', 0.3, 0.7), ('C', 0.9, 0.3), ('D', 0.3, 0.8)] 
sites = sqlContext.createDataFrame(site, schema=['name', 'X', 'Y']) 

地点:

+---+---+---+ 
| id| X| Y| 
+---+---+---+ 
| 01|0.2|0.9| 
| 02|0.3|0.6| 
| 03|0.8|0.1| 
+---+---+---+ 

网站:

+----+---+---+ 
|name| X| X| 
+----+---+---+ 
| A|0.7|0.1| 
| B|0.3|0.7| 
| C|0.9|0.3| 
| D|0.3|0.8| 
+----+---+---+ 

现在我想计算哪些是最接近的有效途径站点的位置。所以,我得到这样的:

+----+---+ 
|name| id| 
+----+---+ 
| A| 03| 
| B| 02| 
| C| 03| 
| D| 01| 
+----+---+ 

我想首先让所有的信息的一个的大数据帧,然后使用的map/reduce来获取位置标识的最接近的所有网站。然而,我不知道这是否是正确的方法,或者我会如何用火花去做这件事。目前我使用的是:

closest_locations = [] 
for s in sites.rdd.collect(): 
    min_dist = float('inf') 
    min_loc = None 
    for l in locations.rdd.collect(): 
     dist = (l.X - s.X)**2 + (l.Y - s.Y)**2 
     if dist < min_dist: 
      min_dist = dist 
      min_loc = l.id 
    closest_locations.append((s.name, min_loc)) 

selected_locations = sqlContext.createDataFrame(closest_locations, schema=['name', 'id']) 

但我想要一个更像火花的方法,因为上面显然很慢。如何有效地评估两个火花数据帧的所有行组合?

回答

3

您可以:

from pyspark.sql.functions import udf, struct 
from pyspark.sql import DoubleType 


dist = udf(lamdba x1, y1, x2, y2: (x1 - x2)**2 + (y1 - y1)**2, DoubleType()) 

locations.join(sites).withColumn("dist", dist(
    locations.X, locations.Y, sites.X, sites.Y)).select(
    "name", struct("id", "dist") 
).rdd.reduceByKey(lambda x, y: min(x, y, key=lambda x: x[1])) 
相关问题