有人在单节点多代理安装上使用kafka python吗?kafka消费者在单节点多代理配置上
我能够生成和使用单节点单一代理设置的数据,但是当我将其更改为单节点时,多代理数据已生成并存储在主题中,但是当我运行消费者代码数据时并未消耗。
以上任何建议,将不胜感激。提前致谢!
注意:所有的配置,如生产者,消费者和服务器属性都经过验证,并没有问题。
监制代码:
from kafka.producer import KafkaProducer
def producer():
data = {'desc' : 'testing', 'data' : 'testing single node multi broker'}
topic = 'INTERNAL'
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'), bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])
producer.send(topic, data)
producer.flush()
消费者代码:
from kafka.consumer import KafkaConsumer
def consumer():
topic = 'INTERNAL'
consumer = KafkaConsumer(topic,bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])
for data in consumer:
print data
服务器1点的配置:我在broker.id
增加了两个服务器的文件,这样与其他券商相同的参数与差异,log.dirs
值。
broker.id=1
port=9092
num.network.threads=3
log.dirs=/tmp/kafka-logs-1
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
delete.topic.enable=true
制片配置:
metadata.broker.list=localhost:9092,localhost:9093,localhost:9094
消费者配置:
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
请问您可以发布您的经纪人配置和您的Python脚本?否则,我们无法帮助你。 – shizhz
嗨Shizhz,详细信息已按要求更新。 – Msk
我重新格式化了你的配置信息,可否请你检查一下,以防万一我犯了错误 – shizhz