2017-02-04 49 views
0

我想拉动kafka数据来引发流式处理,从HDFS中加载已经建好的模型,然后使用kafka消息做出预测。Pyspark预测使用kafka直接流

我试了好几种方法,但我因为一个TypeError的又卡在model.predict:不能类型转换成矢量

从卡夫卡接收到的数据是浮动逗号分隔。

这里是我的代码:

sc = SparkContext(appName="PythonStreamingKafkaForecast") 
ssc = StreamingContext(sc, 10) 

# Create stream to get kafka messages 
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"}) 

features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(",")))) 

model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model") 

#Predict 
predicted = model.predict(features) 

我也试过这样:

lines = directKafkaStream.map(lambda x: x[1]) 
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')])) 

但是这一次,是功能型TransformedStream,不会对preidctions工作的...

你能告诉我我做错了什么吗?

谢谢您的帮助

回答

0

好吧,这个问题是试图从卡夫卡读取数据,即使该主题是空的。

这解决了我的问题:

def predict(rdd): 
    count = rdd.count() 
    if (count > 0): 
     features = rdd.map(lambda s: Vectors.dense(s[1].split(","))) 

     return features 
    else: 
    print("No data received") 

directKafkaStream.foreachRDD(lambda rdd: predict(rdd))