pykafka

    3热度

    3回答

    我使用pykafka从kafka主题获取消息,然后执行一些过程并更新到mongodb。由于pymongodb每次只能更新一个项目,所以我启动了100个进程。但是当开始时,一些进程发生错误“PartitionOwnedError和ConsumerStoppedException”。我不知道为什么。 谢谢。 kafka_cfg = conf['kafka'] kafka_client = Kafka

    1热度

    1回答

    我正在使用pykafka python库api使用以下代码创建一个kafka主题。 from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[topic_name] producer = topic.get_producer(sync=True) pr

    1热度

    2回答

    目前我有一个卡夫卡话题。 现在我需要运行多个消费者因此可以并行读取和处理该消息。 这是可能的。 我使用Python和pykafka库。 consumer = topic.get_simple_consumer(consumer_group=b"charlie", auto_commit_enable=True) 在两个消费者都采取相同的消息。我只需要处理一次消息。

    0热度

    2回答

    from pykafka import KafkaClient client = KafkaClient(hosts='192.168.199.87:9092') topics = client.topics print(topics) 主题的结果是所有值的字典都无: {b'user_name_topic': None, b'test_topic': None} 如何解决呢?

    1热度

    2回答

    如何使用消息批处理或使用pykafka缓冲区生成kafka主题。我的意思是一个生产者可以在一个生产过程中产生很多信息我知道使用消息批处理或缓冲区消息的概念,但我不知道如何实现它。我希望有人可以帮助我在这里

    4热度

    1回答

    我的堆栈是带有gevents的uwsgi。我试图用装饰器来包装我的api端点,以将所有请求数据(url,method,body和response)推送到kafka主题,但它不起作用。我的理论是因为我正在使用gevents,并且我试图以异步模式运行这些异步线程,而实际上推向kafka的异步线程无法使用gevents运行。如果我尝试使方法同步,那么它也不起作用,它会在产品工人中死亡,即在产生调用后永不