2016-12-02 814 views
3

我想构建一个使用Flask框架开发的python API,它使用Kafka主题并将流推送到客户端(html页面或其他应用程序)。Flask API作为实时kafka消费者

我试图用虚拟数据生成实时流(请参阅下面的实时路由)。发生的问题是'result'变量只在循环结束后才被推送,而'result'变量应该在每次迭代时被推送。

我也尝试用卡夫卡连接产生实时流(参见下面的卡夫卡路由)。问题是没有数据返回,而是请求没有完成。

from flask import Response, Flask 
import time 
from kafka import KafkaConsumer 

application = Flask(__name__) 

@application.route('/') 
def index(): 
    return "Hello, World!" 


@application.route('/realtime/') 
def realtime(): 

    def createGenerator(): 

     for i in range(1,10): 
      yield str(i) + '\n' 
      time.sleep(0.2) 

    return Response(createGenerator()) 


@application.route('/kafka/') 
def kafkaStream(): 
    consumer = KafkaConsumer(bootstrap_servers = 'serverlocation', 
        client_id = 'name of client', 
        auto_offset_reset = 'earliest', 
        value_deserializer = lambda m: json.loads(m.decode('ascii'))) 

    consumer.subscribe(topics=['my-topic']) 

    def events(): 
     result = [] 
     for message in consumer: 
      if message is not None: 
       result.append(message.value) 
      yield result 
    return Response(events()) 

if __name__ == '__main__': 
    application.run(debug = True) 

到目前为止,从卡夫卡有效接收数据的唯一方法是将结果打印在控制台中。

from kafka import KafkaConsumer 
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation', 
        client_id = 'name of client', 
        auto_offset_reset = 'earliest', 
        value_deserializer = lambda m: json.loads(m.decode('ascii'))) 

consumer.subscribe(topics=['my-topic']) 

for message in consumer: 
    print message 

我觉得现在的问题是,API不能推的数据,直到进程已经完成,因为KafkaConsumer连接是无限的,没有什么是推送到客户端。

我该如何解决这个问题?

回答

0

由于我想了解更多关于此,有一些时间。经过4个小时的试用后,注意到:

def events(): 
    result = [] 
    for message in consumer: 
     if message is not None: 
      result.append(str(message.value)) # <--- here (str) 
     yield result 
相关问题