2016-09-21 64 views
3

我玩过最新的Spring集成DSL,并一直停留在消费者配置上。卡夫卡0.9弹性集成DSL配置

这是我的消费者的配置:

@Configuration 
public static class ConsumerConfiguration { 

    @Autowired 
    private KafkaConfig kafkaConfig; 

    private Log log = LogFactory.getLog(getClass()); 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBrokerAddress()); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

    @Bean 
    IntegrationFlow consumer() { 
     log.info("starting consumer.."); 

     KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, String> kafkaMDCAListenerContainerSpec = 
       Kafka09.messageDriverChannelAdapter(consumerFactory(), kafkaConfig.getTopic()); 

     return IntegrationFlows 
       .from(kafkaMDCAListenerContainerSpec) 
       .<Map<String, List<String>>>handle((payload, headers) -> { 
        payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue())); 
        return null; 
       }) 
       .get(); 
    } 
} 

在应用程序启动时,我收到以下异常:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'consumer' defined in class path resource [demo/DemoApplication$ConsumerConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'consumer' threw exception; nested exception is java.lang.NoSuchMethodError: org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(Ljava/lang/reflect/Method;)V 
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:599) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1128) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1023) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:510) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:751) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:861) ~[spring-context-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) ~[spring-context-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) [spring-boot-1.4.1.RELEASE.jar:1.4.1.RELEASE] 
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:371) [spring-boot-1.4.1.RELEASE.jar:1.4.1.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-1.4.1.RELEASE.jar:1.4.1.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1186) [spring-boot-1.4.1.RELEASE.jar:1.4.1.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1175) [spring-boot-1.4.1.RELEASE.jar:1.4.1.RELEASE] 
at demo.DemoApplication.main(DemoApplication.java:162) [classes/:na] 

Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'consumer' threw exception; nested exception is java.lang.NoSuchMethodError: org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(Ljava/lang/reflect/Method;)V 
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:189) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:588) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
... 17 common frames omitted 

Caused by: java.lang.NoSuchMethodError: org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(Ljava/lang/reflect/Method;)V 
at org.springframework.integration.dsl.kafka.Kafka09MessageDrivenChannelAdapter$IntegrationMessageListener.<init>(Kafka09MessageDrivenChannelAdapter.java:152) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at org.springframework.integration.dsl.kafka.Kafka09MessageDrivenChannelAdapter.<init>(Kafka09MessageDrivenChannelAdapter.java:50) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at org.springframework.integration.dsl.kafka.Kafka09MessageDrivenChannelAdapterSpec.<init>(Kafka09MessageDrivenChannelAdapterSpec.java:54) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at org.springframework.integration.dsl.kafka.Kafka09MessageDrivenChannelAdapterSpec$KafkaMessageDrivenChannelAdapterListenerContainerSpec.<init>(Kafka09MessageDrivenChannelAdapterSpec.java:71) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at org.springframework.integration.dsl.kafka.Kafka09.messageDriverChannelAdapter(Kafka09.java:148) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at org.springframework.integration.dsl.kafka.Kafka09.messageDriverChannelAdapter(Kafka09.java:123) ~[spring-integration-java-dsl-1.2.0.M2.jar:na] 
at demo.DemoApplication$ConsumerConfiguration.consumer(DemoApplication.java:149) ~[classes/:na] 
at demo.DemoApplication$ConsumerConfiguration$$EnhancerBySpringCGLIB$$845f73e4.CGLIB$consumer$0(<generated>) ~[classes/:na] 
at demo.DemoApplication$ConsumerConfiguration$$EnhancerBySpringCGLIB$$845f73e4$$FastClassBySpringCGLIB$$64cb05f3.invoke(<generated>) ~[classes/:na] 
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) ~[spring-core-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:356) ~[spring-context-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
at demo.DemoApplication$ConsumerConfiguration$$EnhancerBySpringCGLIB$$845f73e4.consumer(<generated>) ~[classes/:na] 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60] 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60] 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60] 
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60] 
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:162) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
... 18 common frames omitted 

任何帮助,高度赞赏。提前致谢。

SOLUTION:
更新和运行的代码可以在这里找到: https://github.com/magiccrafter/spring-kafka09

回答

3

看,你会作为一个依赖:

<dependency> 
     <groupId>org.springframework.kafka</groupId> 
     <artifactId>spring-kafka</artifactId> 
     <version>1.1.0.RELEASE</version> 
     <scope>compile</scope> 
</dependency> 

但是,一个已经为Apache Kafka-0.10。 Spring集成Java DSL与此不兼容。

我们计划在Java DSL 1.2 RELEASE之后迁移到那里。

因此,您必须切换到spring-kafka-1.0.x,或直接使用spring-integration-kafka-2.1.0.RELEASE,绕过Kafka09工厂!

+0

添加依赖关系时没有考虑到这一点。谢谢。 – magiccrafter