2017-10-04 1310 views
1

上述应从单个主题消费的卡夫卡消费者。因为我整合卡夫卡消费者API与弹簧芯web应用程序我不能使用弹簧启动..Kafka Listener方法未被调用。消费者不消费。

Spring的XML配置如下

<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties"> 
    <constructor-arg type="java.lang.String" value="127.0.0.1:9092" /> 
    <constructor-arg type="java.lang.String" value="tdm-group" /> 
    <constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" /> 
</bean> 
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig"> 
    <property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" /> 
</bean> 
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory" 
    factory-method="kafkaContainerFactory"> 
</bean> 

这是创建ListenerContainerFactory

@EnableKafka 
public class KafkaListenerContainerFactory { 

public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(1); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setPollTimeout(3000); 
    return factory; 
} 

@SuppressWarnings("unchecked") 
public static ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(), 
      KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer()); 
} 

} 

这是我的监听器类标注有@KafkaListener

package com.azuga.kafka.listeners; 

import org.springframework.kafka.annotation.KafkaListener; 
public class Listener { 

@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping") 
public void onMessage(String message) { 
    System.out.println(message.toString()); 
} 
} 

这是KafkaListenerConfig类这需要在引导服务器,主题名称等

@EnableKafka 
public class KafkaListenerConfig { 

private static KafkaConsumerProperties kafkaConsumerProperties; 

public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) { 
    this.kafkaConsumerProperties = kafkaConsumerProperties; 
} 

public static Map<String, Object> consumerProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap()); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup()); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    return props; 
} 

public static Deserializer stringKeyDeserializer() { 
    return new StringDeserializer(); 
} 

} 

回答

1

你必须为你的应用有点不寻常配置。

但是我想你错过了@EnableKafka约为@Configuration类的事实。因此,根据Spring框架的文件,你必须使用AnnotationConfigWebApplicationContext类:

* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext} 
* implementation which accepts annotated classes as input - in particular 
* {@link org.springframework.context.annotation.Configuration @Configuration}-annotated 
* classes, but also plain {@link org.springframework.stereotype.Component @Component} 
* classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows 
* for registering classes one by one (specifying class names as config location) as well 
* as for classpath scanning (specifying base packages as config location). 

不幸的是不会有只是简单的XML配置工作。

Spring Kafka没有为XML定义提供任何钩子。

+0

感谢您的快速回复。我并不知道它不适用于xml配置。不过,我确实使用了注释,它像一个魅力 – Sabya