解决方案1:
这是不是有史以来最好的解决方案,但主要的想法实际上是使用spark-ml
来保持你的DataFrame
的信息:
# I used alias to avoid confusion with the mllib library
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.sql.types import DoubleType
documents = sqlContext.createDataFrame([
(0, "hello spark", "data1"),
(1, "this is example", "data2"),
(2, "spark is fast","data3"),
(3, "hello world","data4")], ["doc_id", "doc_text", "another"])
documents.printSchema()
# root
# |-- doc_id: long (nullable = true)
# |-- doc_text: string (nullable = true)
# |-- another: string (nullable = true)
df = (documents
.rdd
.map(lambda x : (x.doc_id,x.doc_text.split(" ")))
.toDF()
.withColumnRenamed("_1","doc_id")
.withColumnRenamed("_2","features"))
htf = MLHashingTF(inputCol="features", outputCol="tf")
tf = htf.transform(df)
tf.show(truncate=False)
# +------+-------------------+------------------------------------------+
# |doc_id|features |tf |
# +------+-------------------+------------------------------------------+
# |0 |[hello, spark] |(262144,[62173,71890],[1.0,1.0]) |
# |1 |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|
# |2 |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])|
# |3 |[hello, world] |(262144,[71890,72594],[1.0,1.0]) |
# +------+-------------------+------------------------------------------+
idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf = idf.fit(tf).transform(tf)
tfidf.show(truncate=False)
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |doc_id|features |tf |idf |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |0 |[hello, spark] |(262144,[62173,71890],[1.0,1.0]) |(262144,[62173,71890],[0.5108256237659907,0.5108256237659907]) |
# |1 |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|(262144,[3370,69994,151198],[0.5108256237659907,0.9162907318741551,0.9162907318741551])|
# |2 |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])|(262144,[3370,62173,251996],[0.5108256237659907,0.5108256237659907,0.9162907318741551])|
# |3 |[hello, world] |(262144,[71890,72594],[1.0,1.0]) |(262144,[71890,72594],[0.5108256237659907,0.9162907318741551]) |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
res = tfidf.rdd.map(lambda x : (x.doc_id,x.features,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))
for r in res.take(10):
print r
# (0, [u'hello', u'spark'], SparseVector(262144, {62173: 1.0, 71890: 1.0}), SparseVector(262144, {62173: 0.5108, 71890: 0.5108}), 1.0216512475319814)
# (1, [u'this', u'is', u'example'], SparseVector(262144, {3370: 1.0, 69994: 1.0, 151198: 1.0}), SparseVector(262144, {3370: 0.5108, 69994: 0.9163, 151198: 0.9163}), 2.3434070875143007)
# (2, [u'spark', u'is', u'fast'], SparseVector(262144, {3370: 1.0, 62173: 1.0, 251996: 1.0}), SparseVector(262144, {3370: 0.5108, 62173: 0.5108, 251996: 0.9163}), 1.9379419794061366)
# (3, [u'hello', u'world'], SparseVector(262144, {71890: 1.0, 72594: 1.0}), SparseVector(262144, {71890: 0.5108, 72594: 0.9163}), 1.4271163556401458)
SOLUT离子2:
您可以考虑使用UDF
:
from pyspark.sql.functions import udf
sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
tfidf.withColumn("idf_sum", sum_("idf")).show()
## +------+-------------------+--------------------+--------------------+------------------+
## |doc_id| features| tf| idf| idf_sum|
## +------+-------------------+--------------------+--------------------+------------------+
## | 0| [hello, spark]|(262144,[62173,71...|(262144,[62173,71...|1.0216512475319814|
## | 1|[this, is, example]|(262144,[3370,699...|(262144,[3370,699...|2.3434070875143007|
## | 2| [spark, is, fast]|(262144,[3370,621...|(262144,[3370,621...|1.9379419794061366|
## | 3| [hello, world]|(262144,[71890,72...|(262144,[71890,72...|1.4271163556401458|
## +------+-------------------+--------------------+--------------------+------------------+
大@eliasah,我有一些问题:1,什么是DF在命令'TF = htf.transform(DF)'? 2-为什么MLlib不支持这个? –
假设MLlib实际上支持这一点,但是以非直接的方式。 – eliasah