2017-01-22 329 views
1

我试图用加特林与卡夫卡,但每隔一段时间,出现此错误:卡夫卡错误:SLF4J:失败的toString()类型的对象上调用[org.apache.kafka.common.Cluster]

01:32:53.933 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=12,client_id=producer-1}, body={topics=[test]})) to node 1011 
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.common.Cluster] 
java.lang.NullPointerException 
at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:72) 
    at java.lang.String.valueOf(String.java:2994) 
    at java.lang.StringBuilder.append(StringBuilder.java:131) 
    at java.util.AbstractCollection.toString(AbstractCollection.java:462) 
    at java.lang.String.valueOf(String.java:2994) 
    at java.lang.StringBuilder.append(StringBuilder.java:131) 
    at org.apache.kafka.common.Cluster.toString(Cluster.java:151) 
    at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305) 
    at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277) 
    at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231) 
    at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:298) 
    at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:208) 
    at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:212) 
    at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:103) 
    at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:88) 
    at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48) 
    at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273) 
    at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260) 
    at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442) 
    at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:433) 
    at ch.qos.logback.classic.Logger.debug(Logger.java:511) 
    at org.apache.kafka.clients.producer.internals.Metadata.update(Metadata.java:133) 
    at org.apache.kafka.clients.NetworkClient.handleMetadataResponse(NetworkClient.java:313) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:298) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:199) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
    at java.lang.Thread.run(Thread.java:745) 
01:32:53.937 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Metadata - Updated cluster metadata version 14 to [FAILED toString()] 

我不知道,如果错误与代码,但这里是我BasicSimulation.scala:

class BasicSimulation extends Simulation { 

val kafkaConf = kafka 
    .topic("test") 
    .properties(
     Map(
     ProducerConfig.ACKS_CONFIG -> "1", 
     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092", 
     ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.ByteArraySerializer", 
     ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.ByteArraySerializer")) 

    val scn = scenario("Kafka Test") 
    .feed(csv("data.csv").circular) 
    .exec(kafka("request") 
    .send("${data}".getBytes: Array[Byte])) 

    setUp(
    scn 
     .inject(constantUsersPerSec(10) during(10 seconds))) 
    .protocols(kafkaConf) 
} 

这里是我的泊坞窗,compose.yml的卡夫卡相关部分:

zookeeper: 
    image: wurstmeister/zookeeper 
    ports: 
     - "2181:2181" 

    kafka: 
    image: wurstmeister/kafka 
    ports: 
     - "9092:9092" 
    depends_on: 
     - zookeeper 
    environment: 
     KAFKA_ADVERTISED_HOST_NAME: kafka 
     KAFKA_ADVERTISED_PORT: 9092 
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
     KAFKA_CREATE_TOPICS: "test:1:1" 
     KAFKA_LOG_CLEANER_ENABLE: 'true' 
    volumes: 
     - /tmp/docker.sock:/var/run/docker.sock 

回答

0

停止和删除我的码头集装箱似乎已经修复它。

docker stop $(docker ps -a -q) 
docker rm $(docker ps -a -q) 
1

在我来说,这是一个版本的问题。我正在使用不兼容的kafka构建和kafka客户端库。

当我切换到kafka_2.11-0.10.2.1和kafka-clients 0.10.2.0时,问题已解决。

0

这是Kafka依赖版本问题。 安装的Kafka版本和Kafka客户端依赖版本被误认为。首先验证已安装的Kafaka版本并更新POM中的Kafka客户端版本。我面临同样的问题,并通过更正kafka-client版本来解决问题。

安装KAFKA版本:0.11

Maven Dependency: 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 

现在,它的工作对我罚款。

相关问题