2015-07-10 115 views
1

使用卡夫卡卡夫卡生产者代码FailedToSendMessageException

import java.util.Date; 
import java.util.Properties; 

import kafka.common.FailedToSendMessageException; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 





public class KafkaProducer { 

    private static Producer<String, String> producer; 
    public KafkaProducer() 
    { 
     Properties props = new Properties(); 
     props.put("metadata.broker.list","localhost:2181"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("request.required.acks","1"); 

     ProducerConfig config = new ProducerConfig(props); 
     producer = new Producer<String,String>(config); 

    } 

    public static void main(String[] args) 
    { 
     if(args.length<2) 
     { 
      System.err.println("Usage: KafkaProducer TopicName MessageCount"); 
      System.exit(0); 
     } 
     String topic = args[0]; 
     int messageCount = Integer.parseInt(args[1]); 

     KafkaProducer kafka = new KafkaProducer(); 
     kafka.publishMessage(topic,messageCount); 

    } 

    private void publishMessage(String topic, int messageCount) 
    { 
     for(int mcount=0;mcount<messageCount;mcount++) 
     { 
      String runtime = new Date().toString(); 
      String msg = "Message Published Time -" + runtime; 
      System.out.println(msg); 

      KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msg); 
      producer.send(data); 

     } 
     producer.close(); 
    } 

} 

虽然使用eclipse运行此程序生产的消息,我得到以下异常:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Message Published Time -Fri Jul 10 13:05:20 IST 2015 
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
    at kafka.producer.Producer.send(Producer.scala:77) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
    at KafkaProducer.publishMessage(KafkaProducer.java:52) 
    at KafkaProducer.main(KafkaProducer.java:39) 

的动物园管理员服务启动和经纪人开始和主题创建。消费者也准备好了。

任何人都可以帮我解决这个问题吗?

回答

1

您可以尝试使用在kafka docs中简要描述的新KafkaProducer。

注意进口的org.apache.kafka.clients.producer.*代替kafka.javaapi.producer.Producer喜欢的东西:

import java.util.Properties; 
import java.util.concurrent.ExecutionException; 

import org.apache.kafka.clients.producer.*; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class KafkaProducerTest { 

    public static void main(String args[]) throws InterruptedException, ExecutionException { 
     // set up Kafka producer 
     KafkaProducer<String,String> kafkaProducer; 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 

     // instantiate the producer 
     kafkaProducer = new KafkaProducer<String,String>(props); 

     // add data to kafka 
     ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic", "test key", "test value"); 
     kafkaProducer.send(producerRecord); 

     // close producer 
     kafkaProducer.close(); 
    } 
} 
0

能否请您检查您的卡夫卡服务器是用于生产通过API同一个端口上运行?

通常Kafka集群在端口9092上运行。如果您的设置出现这种情况,请在生产者配置中使用相同的端口。您的制作人正在使用端口2181.可能这是一个错误。