2017-08-28 103 views
2

我想读卡夫卡的消息,所以我写了简单的消费者阅读来自卡夫卡的消息。如何阅读汇合kafka python中的批处理消息?

While True: 
     message = consumer.poll(timeout=1.0) 
     # i am doing something with messages 

在上面的代码中消息类型的输出是消息对象。我如何获得一系列消息?

是否有可能?

注:只有不多的消费者配置的基本。

回答

5

librdkafka(底层的C库)仅返回消息逐个应用程序,但在内部,消息由批次从经纪人取出,所以没有性能缺点。消息在内部缓冲区中排队,等待您的应用程序进行轮询。

有结构调整的行为:

fetch.wait.max.ms(默认为100),积累的数据提供给经纪人的时候送 fetch.message.max.bytes(默认1048576,1GB),批次 queued.max.messages.kbytes的最大大小(默认1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。

和很多人一样,你可以在这里找到:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的想要的,因为你的方式来处理数据的数据数组,你所能做的就是呼吁调查在如环低超时你做,并且当你有x消息或者y ms时停止你的循环,把它们累积在一个集合中。处理生成的数组并重复循环。

这同样适用于生产:你产生一个数据之一,但消息发送到经纪人之前批处理。

+0

我们可以通过修改underlaying的C代码返回一批消息?因为在Python中迭代并只是获取消息可能会减慢整个过程,从C本身返回一堆消息的速度会更快。 –

+1

之前就是这样的情况,但它是基准的,由于完成分配的方式,没有下降(在C中)一次返回一个消息而返回一个批次。在GitHub上我不知道蟒蛇很好,但也许有一个问题(或者你可以这样讨论),这将是比这个堆栈溢出多个镶嵌 - 你可以(0)在循环使用poll创建批处理 – Treziac

+0

仅供参考(我认为是你的):https://github.com/confluentinc/confluent-kafka-python/issues/252 – Treziac