2016-06-14 84 views
0

我对Kafka非常陌生,今天我尝试创建Java Producer,用于在不同分区上的Kafka主题上生成消息。Kafka Producer Java API不会将消息分发到所有主题分区

首先,我创建了一个包raggieKafka,其中我创建了2个类:TestProducerSimplePartitioner

TestProducer类有下面的代码:

package raggieKafka; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.*; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class TestProducer{ 

    public static void main(String args[]) throws Exception 
    { 
     long events = 0; 

     BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
     events = Integer.parseInt(reader.readLine()); 
     Random rnd = new Random(); 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("topic.metadata.refresh.interval.ms", "1"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class", "raggieKafka.SimplePartitioner"); 
     props.put("request.required.acks", "1"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> prod = new Producer<String, String>(config); 

     for(long i = 0; i < events; i++) 
     { 
      long runtime = new Date().getTime(); 
      String ip = "192.168.2." + rnd.nextInt(255); 
      String msg = runtime + ",www.example.com, " + ip; 
      KeyedMessage<String,String> data = new KeyedMessage<String, String>("page_visits", ip, msg); 
      prod.send(data); 
     } 
     prod.close(); 
    } 
} 

SimplePartitioner类有下面的代码:

package raggieKafka; 

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner{ 

    public SimplePartitioner(VerifiableProperties props) 
    { 

    } 

    public int partition(Object Key, int a_numPartitions) 
    { 
     int partition = 0; 
     String stringKey = (String) Key; 
     int offset = stringKey.indexOf(stringKey); 

     if(offset > 0) 
     { 
      partition = Integer.parseInt(stringKey.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 
} 

编译我对卡夫卡经纪人创建话题,这些Java程序之前:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --create --topic page_visit 
s --zookeeper localhost:2181 --partitions 5 --replication-factor 1 
WARNING: Due to limitations in metric names, topics with a period ('.') or under 
score ('_') could collide. To avoid issues it is best to use either, but not bot 
h. 
Created topic "page_visits". 

现在当我c在java程序中,它将所有消息放入只有1个分区,即page_visits-0,在该分区下发布所有消息,但所有其他分区保持空白。

有人能告诉我为什么我的Java制作者不会将我所有的消息分发给其他分区吗?

逸岸,我看着就谷歌,然后添加一个更多的财产:

props.put("topic.metadata.refresh.interval.ms", "1"); 

但仍生产者没有生产消息的所有主题。

请帮助。

回答

2

你SimplePartitioner代码有错误的下面一行

int offset = stringKey.indexOf(stringKey); 

它总是返回0所以你总是偏移等于0和它永远不会大于0您如果块不再执行。最后它总是返回你的分区0

解决方案:由于您的密钥是IP地址,因此以下更改可按预期工作。

int offset = stringKey.lastIndexOf('.'); 

希望这有助于!

+0

非常感谢你AVR。我做了一个多么愚蠢的错误,这使我几乎疯了。再次感谢修正。干杯。 –

相关问题