2016-04-25 182 views
3

我是新来的卡夫卡,所以如果我错过了一些道歉。春季集成卡夫卡消费者

我想消耗现有主题的消息。

我从this链接中得到了Spring集成基础Kafka示例的代码。

我的代码目前看起来是这样的:

@SpringBootApplication 
public class Application { 

    @Value("${kafka.topic}") 
    private String topic; 

    @Value("${kafka.messageKey}") 
    private String messageKey; 

    @Value("${kafka.broker.address}") 
    private String brokerAddress; 

    @Value("${kafka.zookeeper.connect}") 
    private String zookeeperConnect; 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context 
       = new SpringApplicationBuilder(Application.class) 
       .web(false) 
       .run(args); 
     /*MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class); 
     for (int i = 0; i < 1; i++) { 
      toKafka.send(new GenericMessage<String>("foo" + i)); 
     }*/ 
     PollableChannel fromKafka = context.getBean("received", PollableChannel.class); 
     Message<?> received = fromKafka.receive(); 
     while (received != null) { 
      System.out.println(received); 
      received = fromKafka.receive(); 
     } 
     context.close(); 
     System.exit(0); 
    } 

    /*@ServiceActivator(inputChannel = "toKafka") 
    @Bean 
    public MessageHandler handler() throws Exception { 
     KafkaProducerMessageHandler<String, String> handler = 
       new KafkaProducerMessageHandler<>(kafkaTemplate()); 
     handler.setTopicExpression(new LiteralExpression(this.topic)); 
     handler.setMessageKeyExpression(new LiteralExpression(this.messageKey)); 
     return handler; 
    }*/ 

    /*@Bean 
    public KafkaTemplate<String, String> kafkaTemplate() { 
     return new KafkaTemplate<>(producerFactory()); 
    }*/ 

    /*@Bean 
    public ProducerFactory<String, String> producerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     props.put(ProducerConfig.RETRIES_CONFIG, 0); 
     //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
     //props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
     //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     return new DefaultKafkaProducerFactory<>(props); 
    }*/ 

    @Bean 
    public KafkaMessageListenerContainer<String, String> container() throws Exception { 
     return new KafkaMessageListenerContainer<>(consumerFactory(), new TopicPartition(this.topic, 0)); 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     //props.put(ConsumerConfig.GROUP_ID_CONFIG, "siTestGroup"); 
     //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); 
     //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

    @Bean 
    public KafkaMessageDrivenChannelAdapter<String, String> 
       adapter(KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = 
       new KafkaMessageDrivenChannelAdapter<>(container); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); 
     return kafkaMessageDrivenChannelAdapter; 
    } 

    @Bean 
    public PollableChannel received() { 
     return new QueueChannel(); 
    } 

    /*@Bean 
    public TopicCreator topicCreator() { 
     return new TopicCreator(this.topic, this.zookeeperConnect); 
    }*/ 

    /*public static class TopicCreator implements SmartLifecycle { 

     private final String topic; 

     private final String zkConnect; 

     private volatile boolean running; 

     public TopicCreator(String topic, String zkConnect) { 
      this.topic = topic; 
      this.zkConnect = zkConnect; 
     } 

     @Override 
     public void start() { 
      ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000, 
       ZKStringSerializer$.MODULE$), null, false); 
      try { 
       if (!AdminUtils.topicExists(zkUtils, topic)) 
        AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); 
      } 
      catch (TopicExistsException e) { 
       // no-op 
      } 
      this.running = true; 
     } 

     @Override 
     public void stop() { 
     } 

     @Override 
     public boolean isRunning() { 
      return this.running; 
     } 

     @Override 
     public int getPhase() { 
      return Integer.MIN_VALUE; 
     } 

     @Override 
     public boolean isAutoStartup() { 
      return true; 
     } 

     @Override 
     public void stop(Runnable callback) { 
      callback.run(); 
     } 

    }*/ 

} 

我收到以下错误:

16:22:59.725 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 
16:23:00.229 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 
16:23:00.732 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 
16:23:01.235 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 
16:23:01.739 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 
16:23:02.243 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281) 
    at java.lang.Thread.run(Unknown Source) 

我没有走出日志的信息。

感谢您的帮助:)

编辑:

添加pom.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
    <modelVersion>4.0.0</modelVersion> 
    <parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-dependencies</artifactId> 
    <version>1.4.0.BUILD-SNAPSHOT</version> 
    </parent> 
    <groupId>org.springframework.integration.samples</groupId> 
    <artifactId>kafka</artifactId> 
    <version>4.3.0.BUILD-SNAPSHOT</version> 
    <name>Apache Kafka Sample</name> 
    <description>Apache Kafka Sample</description> 
    <url>http://projects.spring.io/spring-integration</url> 
    <organization> 
    <name>SpringIO</name> 
    <url>https://spring.io</url> 
    </organization> 
    <licenses> 
    <license> 
     <name>The Apache Software License, Version 2.0</name> 
     <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> 
     <distribution>repo</distribution> 
    </license> 
    </licenses> 
    <developers> 
    <developer> 
     <id>garyrussell</id> 
     <name>Gary Russell</name> 
     <email>[email protected]</email> 
     <roles> 
     <role>project lead</role> 
     </roles> 
    </developer> 
    <developer> 
     <id>markfisher</id> 
     <name>Mark Fisher</name> 
     <email>[email protected]</email> 
     <roles> 
     <role>project founder and lead emeritus</role> 
     </roles> 
    </developer> 
    <developer> 
     <id>ghillert</id> 
     <name>Gunnar Hillert</name> 
     <email>[email protected]</email> 
    </developer> 
    <developer> 
     <id>abilan</id> 
     <name>Artem Bilan</name> 
     <email>[email protected]</email> 
    </developer> 
    </developers> 
    <scm> 
    <connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection> 
    <developerConnection>scm:git:scm:git:ssh://[email protected]:spring-projects/spring-integration-samples.git</developerConnection> 
    <url>https://github.com/spring-projects/spring-integration-samples</url> 
    </scm> 
    <dependencies> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-core</artifactId> 
     <version>4.3.0.M1</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.11</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.hamcrest</groupId> 
     <artifactId>hamcrest-all</artifactId> 
     <version>1.3</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-test</artifactId> 
     <version>4.2.5.RELEASE</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-test</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-integration</artifactId> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-kafka</artifactId> 
     <version>2.0.0.M1</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.kafka</groupId> 
     <artifactId>spring-kafka-test</artifactId> 
     <version>1.0.0.M2</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.mockito</groupId> 
     <artifactId>mockito-core</artifactId> 
     <version>1.9.5</version> 
     <scope>test</scope> 
    </dependency> 
    </dependencies> 
    <repositories> 
    <repository> 
     <id>repo.spring.io.milestone</id> 
     <name>Spring Framework Maven Milestone Repository</name> 
     <url>https://repo.spring.io/libs-milestone</url> 
    </repository> 
    </repositories> 
    <build> 
    <plugins> 
     <plugin> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-maven-plugin</artifactId> 
     </plugin> 
    </plugins> 
    </build> 
</project> 
+0

什么版本是卡夫卡经纪人? –

+0

@GaryRussell:我已经添加了POM。我不得不做出一些更改,因为它在版本上给出错误,所以我只是使用了它并使用了maven中的最新版本。 –

+0

pom没有帮助;我们需要知道你是哪个版本的kafka __talking to__。查看我的答案获取更多信息。 –

回答

1

样例应用程序的版本需要0.9经纪人 - 看this question及其链接Kafka docs

该示例应用程序的早期版本适用于0.8经纪人。

您需要this commit之前的版本我认为我们在github中没有标签,但this is the previous version与0.8一起使用。

编辑

随着0.8的客户端版本,则需要更改此代码...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
      BrokerAddress.fromAddress(this.brokerAddress)); 

...到...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
      BrokerAddress.fromAddress(this.firstBrokerAddress), 
      BrokerAddress.fromAddress(this.secondBrokerAddress)); 

即提供数组BrokerAddress对象。

随着0.9客户端的host:port对一个简单的逗号分隔的列表,可以使用

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); 
+0

工作就像一个魅力,我不得不添加一些排除在卡夫卡依赖,但按预期工作。还有一个小问题,我如何多个经纪商?它与1个经纪人一起工作,但为了多重价值,如何做到这一点?我已经尝试过''''和';',但是发现了错误。再次感谢:) –

+0

这取决于版本;看我的编辑。 –