2017-04-03 117 views
0

有人在单节点多代理安装上使用kafka python吗?kafka消费者在单节点多代理配置上

我能够生成和使用单节点单一代理设置的数据,但是当我将其更改为单节点时,多代理数据已生成并存储在主题中,但是当我运行消费者代码数据时并未消耗。

以上任何建议,将不胜感激。提前致谢!

注意:所有的配置,如生产者,消费者和服务器属性都经过验证,并没有问题。

监制代码:

from kafka.producer import KafkaProducer 

def producer(): 
    data = {'desc' : 'testing', 'data' : 'testing single node multi broker'} 
    topic = 'INTERNAL' 
    producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'), bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"]) 

    producer.send(topic, data) 

    producer.flush() 

消费者代码:

from kafka.consumer import KafkaConsumer 

def consumer(): 
    topic = 'INTERNAL' 
    consumer = KafkaConsumer(topic,bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"]) 


    for data in consumer: 
     print data 

服务器1点的配置:我在broker.id增加了两个服务器的文件,这样与其他券商相同的参数与差异,log.dirs值。

broker.id=1 
port=9092 
num.network.threads=3 
log.dirs=/tmp/kafka-logs-1 
num.partitions=3 
num.recovery.threads.per.data.dir=1 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
log.cleaner.enable=false 
zookeeper.connect=localhost:2181 
delete.topic.enable=true 

制片配置:

metadata.broker.list=localhost:9092,localhost:9093,localhost:9094 

消费者配置:

zookeeper.connect=127.0.0.1:2181 
zookeeper.connection.timeout.ms=6000 
+0

请问您可以发布您的经纪人配置和您的Python脚本?否则,我们无法帮助你。 – shizhz

+0

嗨Shizhz,详细信息已按要求更新。 – Msk

+0

我重新格式化了你的配置信息,可否请你检查一下,以防万一我犯了错误 – shizhz

回答

0

你收到一个简单的卡夫卡消费者的消息?

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092,localhost:9093,localhost:9094 –topic INTERNAL –from-beginning 

或者这一个:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic INTERNAL 

如果你得到第二个命令的消息,请删除您的经纪人/tmp/log.dir和日志文件中/tmp/zookeepker/version-2/。然后重新启动zookeeper和你的经纪人,并再次创建你的话题。

+0

就像一个魅力一样。非常感谢。不过,我想配置3节点3分区3副本集群。您能否就建立群集的理想配置和规格咨询意见。同时我也会通过API文档。 – Msk

+0

不错,你能接受我的答案,为了让我得到一些观点:) – ImbaBalboa

+0

对于你的问题,你可能会发表另一篇文章。这不是一个简单的问题,我不是专家。 – ImbaBalboa

相关问题