2016-09-06 125 views
2

我是新来的spark和mqtt。我试图使用我得到了网上一个名为wordcount.py使用mqtt与pyspark streaming

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.mqtt import MQTTUtils 
if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>" 
     exit(-1) 

    sc = SparkContext(appName="PythonStreamingMQTTWordCount") 
    ssc = StreamingContext(sc, 1) 

    brokerUrl = sys.argv[1] 
    topic = sys.argv[2] 

    lines = MQTTUtils.createStream(ssc, brokerUrl, topic) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

,我跟着指示安装mosquitto代理(它的工作)MQTTUtils的代码,下载火花流-MQTT-assembly_2。 11-1.6.2.jar并运行此命令的python脚本: 〜$火花提交--jars火花流-MQTT组装_ *罐子wordcount.py

,但显示的错误:

from pyspark.streaming.mqtt import MQTTUtils

ImportError:No module named mqtt

是我错过了什么从这里? 谢谢

+2

如何创建[MCVE。另外Spark 2.0+不再提供MQTT后端。它已被转移到Spark包。 – zero323

+0

我有同样的问题,但我正在使用2.0版本,现在我正在使用1.6.2版本,脚本正在运行。 – 2016-10-11 20:38:20

回答

3

对于spark版本2. *我们可以使用包含Bahir Jar的Structured Streaming中的MQTT。

从pyspark连接到MQTT经纪人:

​​