2016-07-26 114 views
2

我按照以下说明设置了一个多节点kafka群集。 现在,如何连接到动物园管理员?在JAVA中连接到生产者/消费者端的一个动物园管理员,还是有办法连接所有的动物园管理员节点?在Apache Kafka多节点群集中连接到Zookeeper

设置多节点的Apache ZooKeeper的集群

在群集的每个节点添加下列行到文件卡夫卡/配置/ zookeeper.properties

server.1=zNode01:2888:3888 
    server.2=zNode02:2888:3888 
    server.3=zNode03:2888:3888 
    #add here more servers if you want 
    initLimit=5 
    syncLimit=2 

在群集的每个节点创建一个文件在由dataDir属性表示的文件夹中调用myid(默认情况下文件夹为/ tmp/zookeeper)。该身份识别码文件应该只包含Z序节点的ID(“1” zNode01,“2” ZNode02,等...)

设置多代理的Apache卡夫卡集群

在群集中的每个节点修改修改从文件卡夫卡/配置/ server.properties属性zookeeper.connect:

zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181 

在群集的每个节点从文件卡夫卡/配置/ server.properties修改属性host.name: host.name = zNode0x

在群集的每个节点上修改文件kafka/config/server.properties中的属性broker.id(群集中的每个代理应具有唯一的ID)

回答

3

您可以将生产者或消费者中的所有节点。卡夫卡有足够的智能,它会连接到具有您所需的基础上,复制因子或分区

这里的数据节点消费者代码:

Properties props = new Properties(); 
    props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("foo", "bar")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
    } 

你可以找到更多信息here

注意:这个问题是它会打开多个连接来找出哪个节点存放数据。对于更健壮和可扩展的系统,您可以维护分区号和节点名称的地图,这也有助于加载数据。

这里是生产样品

Properties props = new Properties(); 
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer<>(props); 
for(int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 

producer.close(); 

更多信息here

+0

如何为一个主题创建多个分区?如何才能做到这一点?我们不需要像这样通过ZkClient进行授权吗?这里讨论:http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java – amateur

+0

AdminUtils.createTopic(zkUtils,topicName,noOfPartitions,noOfReplication,topicConfiguration); – amateur

+0

您可以使用AdminUtils创建主题..但更好的做法是在节点本身中创建它,并使用命令调用这是一次性任务。命令格式/bin/kafka-topics.sh --zookeeper c6401.ambari.apache.org:2181 --create --topic test_topic --partitions 2 --replication-factor 2创建主题“test_topic”。 – Shettyh

0

无需通过在卡夫卡的客户动物园管理员连接属性(监制&消费者)。

从Kafka-v9及以上版本开始,Kafka Producer和Consumer不与Zookeeper进行通信。

+0

我使用V9,即时获取以下异常,在属性中需要zookeeper信息.. – amateur

+0

引起:java.lang.IllegalArgumentException:需求失败:缺少必需的属性'zookeeper.connect' \t at scala.Predef $ .require(Predef.scala:233) \t at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) \t at kafka.utils.ZKConfig。 (ZkUtils.scala:740) – amateur

+0

使用'kafka-clients'库中的KafkaProducer和KafkaConsumer。 –