2016-09-20 117 views
2

我在使用python上的Spark Data Frames在两个数据框上进行连接时遇到了一些麻烦。我有两个数据框,我不得不改变列的名称,以使它们对于每个数据框都是唯一的,所以后来我可以告诉哪一列是哪一列。我这样做是为了重命名列(firstDf和secondDf是使用功能createDataFrame创建星火DataFrames):我重复了这个第二个数据帧Pyspark DataFrame - 如何使用变量进行连接?

oldColumns = firstDf.schema.names 
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns)) 
firstDf = firstDf.toDF(*newColumns) 

。然后我试图加入他们的行列,使用下面的代码:

from pyspark.sql.functions import * 

firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner') 

使用它,这样我得到以下错误:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

这仅是为了说明该列输入列阵列中存在。

如果我不重命名DataFrames列我可以在使用这段代码加入其中:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner') 

但是,这给我一份有暧昧列名的数据帧。

关于如何解决这个问题的任何想法?

回答

0

一般来说,不要在名称中使用点。这些都有特殊含义(可用于确定表格或访问struct字段),并需要一些额外的工作才能被正确识别。

对于相等连接所有你需要的是一个列名:

from pyspark.sql.functions import col 

firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 
secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 

column = 'firstColumn' 
firstDf.join(secondDf, [column], 'inner') 

## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string] 

对于复杂的情况下,使用表别名:

直接
firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 

firstDf.alias("firstDf").join(
    secondDf.alias("secondDf"), 
    # After alias prefix resolves to table name 
    col(firstColumn) == col(secondColumn), 
    "inner" 
) 

## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string] 

您还可以使用父帧:

column = 'firstColumn' 

firstDf.join(secondDf, firstDf[column] == secondDf[column]) 
+0

感谢您的回复,特别是关于不使用名称中的点的提示。第一种方法是有效的,但我需要连接的DataFrame为两个连接的DataFrame的每一列都有唯一的列名。尽管如此,按照建议使用表别名给出了我在问题中显示的同样的AnalysisException错误。 –

+0

它应该工作得很好。我添加了一个完全可重复的例子的表格定义。 – zero323

+0

对不起,我只是意识到改变点使它工作。再次感谢您的回复! –