2017-04-17 134 views
1

我有一个我用Java编写的卡夫卡制作人。尽管它基本上是示例代码的切入点和过去,但它似乎并不正确。我希望输出是10个消息到我的集群。相反,我得到了消息成功的输出,但实际上没有任何信息进入我的群集。我不确定从何处开始排除故障。卡夫卡制作人似乎无法正常工作

import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 

public class SimpleProducer { 

public static void main(String[] args) throws Exception{ 

    String topicName = "test_topic"; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "skynet.local:6667");  
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = new KafkaProducer<String, String>(props); 

    for(int i = 0; i < 10; i++) 
     producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); 
     System.out.println("Message sent successfully"); 
     producer.close(); 
} 
} 
+0

发送卡夫卡0.10.x配置参数是一个异步调用,这样尝试打印您的消息之前等待返回的未来。更好的是,您可以打印未来用于调试目的。我也建议acks等于1进行测试。 – dawsaw

+0

所以我将ack设置为1,并完全注释掉屏幕输出。它仍然需要很长时间才能运行,似乎并没有发出任何消息。 –

+0

我在Eclipse中运行这个。这很重要吗? –

回答

2

由于某些环境未清除的,所以我会尽力给您卡夫卡服务器上回答了你的问题的基础正在6667端口了。

您的代码可能需要以2个palces调整(有人可以帮助我提高的话):

props.put("linger.ms", 1); // set to 0 let Producer can send message immediately 

这里,跌落producer.close();for循环:

Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer()); 
for(int i = 0; i < 10; i++) { 
    Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); 
    System.out.println(f.get()); // don't do that in your Production, here just for debugging purpose. 
} 
producer.close(); 

还有一两件事,您可以在测试之前运行kafka-console-consumer.shkafka-console-producer.sh以确认您的Kafka服务器,并且您的SimpleProducer已在运行。在Kafka Producer Configuration Parameters

+0

我将此标记为答案,因为事实证明,确实关闭生产者确实需要将其移出循环,但我还需要更新我的主机文件,以便IP能够解决。谢谢! –

+0

@BobWakefield和'''new KafkaProducer (props,new StringSerializer(),new StringSerializer());'''将序列化器添加到您的KafkaProducer构造函数可能会更好。 –