2016-02-05 55 views
2

该项目是为real time search engine - log analysis表现。瓶 - 拉直播kafka数据 - 集成卡夫卡与Python瓶

我有一个从Spark处理到卡夫卡的实时流数据。

现在与卡夫卡输出, 我想get the data from the Kafka using Flask ..和visualize it using Chartjs或其他一些可视化..

我如何从Kafka using the python flask实时数据流?

任何想法如何开始?

任何帮助将不胜感激!

谢谢!

+0

你试过了什么?社区无法解决这个问题,你的问题太过于委员会。 –

回答

0

我正在研究一个类似的问题(带有Kafka流出的流式数据的小Flask应用程序)。

你必须做几件事来设置它。首先,你需要一个KafkaConsumer抢消息:

from kafka import KafkaConsumer 
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver) 
consumer.subscribe(topics=['topicid']) 

try: 
    # this method should auto-commit offsets as you consume them. 
    # If it doesn't, turn on logging.DEBUG to see why it gets turned off. 
    # Not assigning a group_id can be one cause 
    for msg in consumer: 
     # TODO: process the kafka messages. 
finally: 
    # Always close your producers/consumers when you're done 
    consumer.close() 

这大约是最基本的KafkaConsumer。 for循环会阻塞线程并循环,直到提交最后一条消息。还有consumer.poll()方法可以在给定的时间内抓取您可以发送的消息,具体取决于您希望如何构建数据流。卡夫卡的设计考虑了长期运行的消费者流程,但如果您正确提交消息,则可以根据需要打开和关闭消费者。

现在你有了数据,所以你可以使用Flask将它流式传输到浏览器。我对ChartJS并不熟悉,但live streaming from Flask的核心是调用一个循环内的yield结尾的python函数,而不是在处理结束时使用return

查看Michael Grinberg's bloghis followup作为使用Flask进行流式传输的实际示例。 (注意:任何实际上在严肃的Web应用程序中流式传输视频的人都可能希望将其编码为像使用ffmpy的广泛使用的H.264之类的视频编解码器并将其包装在MPEG-DASH中......或者可以选择一个框架,这东西给你。)