2016-04-29 111 views
-1

我想在Spark中的某些数据上使用FPGrowth函数。我测试的例子在这里,没有任何问题: https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.htmlpyspark FPGrowth不适用于RDD

然而,我的数据集从蜂巢

data = hiveContext.sql('select transactionid, itemid from transactions') 
model = FPGrowth.train(data, minSupport=0.1, numPartitions=100) 

这种失败,法来不存在:

py4j.protocol.Py4JError: An error occurred while calling o764.trainFPGrowthModel. Trace: 
py4j.Py4JException: Method trainFPGrowthModel([class org.apache.spark.sql.DataFrame, class java.lang.Double, class java.lang.Integer]) does not exist 

所以,我把它转换到RDD:

data=data.rdd 

现在我开始获取一些s trange pickle序列化错误。

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row) 

然后我开始看类型。在该示例中,数据通过平面地图运行。这将返回与RDD不同的类型。通过flatmap返回

RDD类型:pyspark.rdd.PipelinedRDD通过hiveContext返回

RDD类型:pyspark.rdd.RDD

FPGrowth似乎只与PipelinedRDD工作。有什么方法可以将常规RDD转换为PipelinedRDD?

谢谢!

回答

0

好了,我的查询是错误的,但改变了使用collect_set然后 我设法做绕过错误类型:

data=data.map(lambda row: row[0])