2016-03-03 132 views
3

我正在使用Spark MLlib来计算每个文档的所有术语'TFIDF的总和(每个文档由一行数据框描述), 我写了下面的代码:将生成的TFIDF计算添加到Pyspark中原始文档的数据框中

from pyspark import SparkContext 
from pyspark.sql import SQLContext, Row 
from pyspark.mllib.feature import HashingTF 
from pyspark.mllib.feature import IDF 
from pyspark.mllib.linalg import SparseVector 

sc = SparkContext() 
sqlContext = SQLContext(sc) 

#SECTION 1 
documents = sqlContext.createDataFrame([ 
    (0, "hello spark", "data1"), 
    (1, "this is example", "data2"), 
    (2, "spark is fast","data3"), 
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"]) 

#SECTION 2 
documents.registerTempTable("doc_table") 
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table") 
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" ")) 

#SECTION 3 
hashingTF = HashingTF() 
tf = hashingTF.transform(doc_words).cache() 
idf = IDF().fit(tf) 
tfidf = idf.transform(tf).cache() 

#SECTION 4 
sumrdd=tfidf.map(lambda v: v.values.sum()) 
print('\n Summation of TFIDF for each document:') 
print(sumrdd.collect()) 

我得到了以下结果:

[1.0216512475319814, 2.3434070875143007, 1.9379419794061366, 1.4271163556401458] 

任何人帮助我,使在代码的一些变化,以保留原有数据(DOC_ID,doc_text,另一个)是在计算过程中链接的tf,idf和tfidf,因为我在数据框中有成千上万行,我必须确保每个文档都正确连接到它的总和TFIDF。 最后,我想获得的结果(或数据帧)是这样的:

(0, "hello spark", "data1", 1.0216512475319814) 
(1, "this is example", "data2", 2.3434070875143007) 
(2, "spark is fast","data3",1.9379419794061366) 
(3, "hello world","data4",1.4271163556401458) 

回答

6

解决方案1:

这是不是有史以来最好的解决方案,但主要的想法实际上是使用spark-ml来保持你的DataFrame的信息:

# I used alias to avoid confusion with the mllib library 
from pyspark.ml.feature import HashingTF as MLHashingTF 
from pyspark.ml.feature import IDF as MLIDF 
from pyspark.sql.types import DoubleType 

documents = sqlContext.createDataFrame([ 
    (0, "hello spark", "data1"), 
    (1, "this is example", "data2"), 
    (2, "spark is fast","data3"), 
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"]) 

documents.printSchema() 
# root 
# |-- doc_id: long (nullable = true) 
# |-- doc_text: string (nullable = true) 
# |-- another: string (nullable = true) 

df = (documents 
    .rdd 
    .map(lambda x : (x.doc_id,x.doc_text.split(" "))) 
    .toDF() 
    .withColumnRenamed("_1","doc_id") 
    .withColumnRenamed("_2","features")) 

htf = MLHashingTF(inputCol="features", outputCol="tf") 
tf = htf.transform(df) 
tf.show(truncate=False) 
# +------+-------------------+------------------------------------------+ 
# |doc_id|features   |tf          | 
# +------+-------------------+------------------------------------------+ 
# |0  |[hello, spark]  |(262144,[62173,71890],[1.0,1.0])   | 
# |1  |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])| 
# |2  |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])| 
# |3  |[hello, world]  |(262144,[71890,72594],[1.0,1.0])   | 
# +------+-------------------+------------------------------------------+ 

idf = MLIDF(inputCol="tf", outputCol="idf") 
tfidf = idf.fit(tf).transform(tf) 
tfidf.show(truncate=False) 
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+ 
# |doc_id|features   |tf          |idf                     | 
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+ 
# |0  |[hello, spark]  |(262144,[62173,71890],[1.0,1.0])   |(262144,[62173,71890],[0.5108256237659907,0.5108256237659907])       | 
# |1  |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|(262144,[3370,69994,151198],[0.5108256237659907,0.9162907318741551,0.9162907318741551])| 
# |2  |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])|(262144,[3370,62173,251996],[0.5108256237659907,0.5108256237659907,0.9162907318741551])| 
# |3  |[hello, world]  |(262144,[71890,72594],[1.0,1.0])   |(262144,[71890,72594],[0.5108256237659907,0.9162907318741551])       | 
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+ 

res = tfidf.rdd.map(lambda x : (x.doc_id,x.features,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum()))) 

for r in res.take(10): 
    print r 

# (0, [u'hello', u'spark'], SparseVector(262144, {62173: 1.0, 71890: 1.0}), SparseVector(262144, {62173: 0.5108, 71890: 0.5108}), 1.0216512475319814) 
# (1, [u'this', u'is', u'example'], SparseVector(262144, {3370: 1.0, 69994: 1.0, 151198: 1.0}), SparseVector(262144, {3370: 0.5108, 69994: 0.9163, 151198: 0.9163}), 2.3434070875143007) 
# (2, [u'spark', u'is', u'fast'], SparseVector(262144, {3370: 1.0, 62173: 1.0, 251996: 1.0}), SparseVector(262144, {3370: 0.5108, 62173: 0.5108, 251996: 0.9163}), 1.9379419794061366) 
# (3, [u'hello', u'world'], SparseVector(262144, {71890: 1.0, 72594: 1.0}), SparseVector(262144, {71890: 0.5108, 72594: 0.9163}), 1.4271163556401458) 

SOLUT离子2:

您可以考虑使用UDF

from pyspark.sql.functions import udf 

sum_ = udf(lambda v: float(v.values.sum()), DoubleType()) 
tfidf.withColumn("idf_sum", sum_("idf")).show() 

## +------+-------------------+--------------------+--------------------+------------------+ 
## |doc_id|   features|     tf|     idf|   idf_sum| 
## +------+-------------------+--------------------+--------------------+------------------+ 
## |  0|  [hello, spark]|(262144,[62173,71...|(262144,[62173,71...|1.0216512475319814| 
## |  1|[this, is, example]|(262144,[3370,699...|(262144,[3370,699...|2.3434070875143007| 
## |  2| [spark, is, fast]|(262144,[3370,621...|(262144,[3370,621...|1.9379419794061366| 
## |  3|  [hello, world]|(262144,[71890,72...|(262144,[71890,72...|1.4271163556401458| 
## +------+-------------------+--------------------+--------------------+------------------+ 
+0

大@eliasah,我有一些问题:1,什么是DF在命令'TF = htf.transform(DF)'? 2-为什么MLlib不支持这个? –

+0

假设MLlib实际上支持这一点,但是以非直接的方式。 – eliasah

相关问题