我正在研究一个类似的问题(带有Kafka流出的流式数据的小Flask应用程序)。
你必须做几件事来设置它。首先,你需要一个KafkaConsumer抢消息:
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver)
consumer.subscribe(topics=['topicid'])
try:
# this method should auto-commit offsets as you consume them.
# If it doesn't, turn on logging.DEBUG to see why it gets turned off.
# Not assigning a group_id can be one cause
for msg in consumer:
# TODO: process the kafka messages.
finally:
# Always close your producers/consumers when you're done
consumer.close()
这大约是最基本的KafkaConsumer。 for
循环会阻塞线程并循环,直到提交最后一条消息。还有consumer.poll()
方法可以在给定的时间内抓取您可以发送的消息,具体取决于您希望如何构建数据流。卡夫卡的设计考虑了长期运行的消费者流程,但如果您正确提交消息,则可以根据需要打开和关闭消费者。
现在你有了数据,所以你可以使用Flask将它流式传输到浏览器。我对ChartJS并不熟悉,但live streaming from Flask的核心是调用一个循环内的yield
结尾的python函数,而不是在处理结束时使用return
。
查看Michael Grinberg's blog和his followup作为使用Flask进行流式传输的实际示例。 (注意:任何实际上在严肃的Web应用程序中流式传输视频的人都可能希望将其编码为像使用ffmpy的广泛使用的H.264之类的视频编解码器并将其包装在MPEG-DASH中......或者可以选择一个框架,这东西给你。)
你试过了什么?社区无法解决这个问题,你的问题太过于委员会。 –