我试图从LDA模型中获得术语ID的相应主题单词。UDF将单词映射到术语索引中的Spark
这里是主题的数据帧和它的字分布与LDA在星火
topics_desc=ldaModel.describeTopics(20)
topics_desc.show(1)
+-----+--------------------+--------------------+
|topic| termIndices| termWeights|
+-----+--------------------+--------------------+
| 0|[0, 39, 68, 43, 5...|[0.06362107696025...|
+-----+--------------------+--------------------+
only showing top 1 row
现在,因为我们有termIndices,而不是实际的话,我想另一列添加到这将是该数据帧相应termIndices的字样。
现在,因为我在Spark中运行了CountVectorizer,我使用该模型并获取如下所示的单词列表。
# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)
cvModel=cv.fit(swremoved_df)
cvModel.vocabulary给出单词列表。
所以现在这里是一个UDF我写来获取映射:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
def term_to_words(termindices):
""" To get the corresponding words from term indices
"""
return np.array(cvModel.vocabulary)[termindices]
term_to_words_conv=udf(term_to_words)
topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices"))
我因为在numpy的数组我可以索引通过传递指数的升降转换的列表,以NP阵列的原因,其中一个可以”不要在列表中做这件事。
但我得到这个错误。我不确定为什么这样,因为我在这里几乎没有做任何事情。
Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
编辑:
,所以我想利用映射函数,而不是UDF
def term_to_words(x):
""" Mapper function to get the corresponding words for the term index
"""
row=x.asDict()
word_list=np.array(cvModel.vocabulary)
return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])
topics_rdd=topics_desc.rdd.map(term_to_words)
/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
931 # SparkContext#runJob.
932 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
934 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
935
AttributeError: 'NoneType' object has no attribute 'sc'
您必须改用'topics_desc.rdd.map(...)。toDF()'。在'map'函数中,您可以将字典输出转换为Spark'Row',以便还可以转换回数据帧。 – titipata
为什么会失败?我的意思是我尝试了一个看起来很好的mapper函数。但是给出了一个错误。请参阅编辑 – Baktaawar