2017-04-03 67 views
1

以下是我正在尝试的操作。我想在两个不同的数据框中对两列中的每个条目进行比较。的dataframes如下所示:如何在PySpark 1.6.1中将第二个数据帧的列传递给UDF

>>> subject_df.show() 
+------+-------------+ 
|USERID|  FULLNAME| 
+------+-------------+ 
| 12345| steve james| 
| 12346| steven smith| 
| 43212|bill dunnigan| 
+------+-------------+ 

>>> target_df.show() 
+------+-------------+ 
|USERID|  FULLNAME| 
+------+-------------+ 
|111123| steve tyler| 
|422226| linda smith| 
|123333|bill dunnigan| 
| 56453| steve smith| 
+------+-------------+ 

这里是我尝试使用的逻辑:

# CREATE FUNCTION  
def string_match(subject, targets): 
    for target in targets: 
     <logic> 
    return logic_result 

# CREATE UDF 
string_match_udf = udf(string_match, IntegerType()) 

# APPLY UDF 
subject_df.select(subject_df.FULLNAME, string_match_udf(subject_df.FULLNAME, target_df.FULLNAME).alias("score")) 

这在pyspark壳运行代码时出现错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o45.select. 
: java.lang.RuntimeException: Invalid PythonUDF PythonUDF#string_match(FULLNAME#2,FULLNAME#5), requires attributes from more than one child. 

我认为我的问题的根源是试图将第二列传递给函数。我应该使用RDD吗?请记住,实际的subject_df和target_df都是超过100,000行。我愿意接受任何建议。

回答

3

它看起来像你有一个错误的想法用户自定义函数是如何工作的:

  • 功能从只有一行的时候
  • 您不能从无关DataFame使用数据接收值。

做你想做的事情的唯一方法就是采用笛卡尔产品。

subject_df.join(target_df).select(
f(subject_df.FULLNAME, target_df.FULLNAME) 
) 

其中f是当时比较两个元素的函数。

+0

任何关于创建100,000,000,000行长的笛卡尔产品的担忧? – datanerdjake

相关问题