2016-08-02 43 views
3

蟒蛇大熊猫库包含以下功能:Spark Dataframe是否具有熊猫合并指示符的等效选项?

DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False, 
       right_index=False, sort=False, suffixes=('_x', '_y'), copy=True, 
       indicator=False) 

指示灯场与熊猫的value_counts()函数可用于快速确定如何一个联接进行合并。

例子:

In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']}) 

In [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]}) 

In [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True) 
Out[50]: 
    col1 col_left col_right  _merge 
0  0  a  NaN left_only 
1  1  b  2.0  both 
2  2  NaN  2.0 right_only 
3  2  NaN  2.0 right_only 

什么是检查的火花数据帧中的连接性能的最佳方式是什么?

自定义函数,问题的答案1提供的:它并没有给出正确的结果,但它是否会这将是巨大的:

ASchema = StructType([StructField('id', IntegerType(),nullable=False), 
       StructField('name', StringType(),nullable=False)]) 
BSchema = StructType([StructField('id', IntegerType(),nullable=False), 
       StructField('role', StringType(),nullable=False)]) 
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')]) 
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')]) 
ADF = hc.createDataFrame(AData,ASchema) 
BDF = hc.createDataFrame(BData,BSchema) 
DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer") 
DFJOIN.show() 

Input: 
+----+--------+----+---------+ 
| id| name| id|  role| 
+----+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
|null| null| 5| manager| 
+----+--------+----+---------+ 

from pyspark.sql.functions import * 
DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\ 
    .withColumn("id", coalesce(ADF["id"], BDF["id"]))\ 
    .drop(ADF["id"])\ 
    .drop(BDF["id"]) 
DFJOINMERGE.show() 

Output 
+---+--------+---+---------+------+ 
| id| name| id|  role|_merge| 
+---+--------+---+---------+------+ 
| 1| michel| 1| engineer| both| 
| 2|diederik| 2|  lead| both| 
| 3|  rok| 3|scientist| both| 
| 4| piet| 4|  null| both| 
| 5| null| 5| manager| both| 
+---+--------+---+---------+------+ 

==> I would expect id 4 to be left, and id 5 to be right. 

Changing join to "left": 


Input: 
+---+--------+----+---------+ 
| id| name| id|  role| 
+---+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
+---+--------+----+---------+ 

Output 
+---+--------+---+---------+------+ 
| id| name| id|  role|_merge| 
+---+--------+---+---------+------+ 
| 1| michel| 1| engineer| both| 
| 2|diederik| 2|  lead| both| 
| 3|  rok| 3|scientist| both| 
| 4| piet| 4|  null| both| 
+---+--------+---+---------+------+ 

回答

3

试试这个:

>>> from pyspark.sql.functions import * 
>>> sdf1 = sqlContext.createDataFrame(df1) 
>>> sdf2 = sqlContext.createDataFrame(df2) 
>>> sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer") 
>>> sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\ 
... .withColumn("col1", coalesce(sdf1["col1"], sdf2["col1"]))\ 
... .drop(sdf1["col1"])\ 
... .drop(sdf2["col1"]) 
+0

非常感谢。我对它进行了测试,看起来每次都是这样,我会把测试代码放在问题中。 – mnos

+0

我无法重现该问题。这两个例子都适合我。 – 2016-08-04 22:00:28

+0

有趣的是,你使用哪种火花和蟒蛇版本?我们使用Jupyter笔记本使用Spark 1.6和Python 2,7。如果你运行完全相同的示例代码,它可能是一些版本问题。 – mnos

4

改变的LostInOverflow的答案和得到这个工作:

from pyspark.sql import Row 

ASchema = StructType([StructField('ida', IntegerType(),nullable=False), 
       StructField('name', StringType(),nullable=False)]) 
BSchema = StructType([StructField('idb', IntegerType(),nullable=False), 
       StructField('role', StringType(),nullable=False)]) 
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')]) 
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')]) 
ADF = hc.createDataFrame(AData,ASchema) 
BDF = hc.createDataFrame(BData,BSchema) 
DFJOIN = ADF.join(BDF, ADF['ida'] == BDF['idb'], "outer") 
DFJOIN.show() 


+----+--------+----+---------+ 
| ida| name| idb|  role| 
+----+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
|null| null| 5| manager| 
+----+--------+----+---------+ 

from pyspark.sql.functions import * 
DFJOINMERGE = DFJOIN.withColumn("_merge", when(DFJOIN["ida"].isNull(), "right_only").when(DFJOIN["idb"].isNull(), "left_only").otherwise("both"))\ 
    .withColumn("id", coalesce(ADF["ida"], BDF["idb"]))\ 
    .drop(DFJOIN["ida"])\ 
    .drop(DFJOIN["idb"]) 
#DFJOINMERGE.show() 
DFJOINMERGE.groupBy("_merge").count().show() 

+----------+-----+ 
| _merge|count| 
+----------+-----+ 
|right_only| 1| 
| left_only| 1| 
|  both| 3| 
+----------+-----+