2017-07-19 138 views
0

我正在尝试将我的一些代码从XML迁移到java dsl样式(pre-java8)中。弹簧集成java dsl - java 7 - 在出站通道适配器中设置轮询器

这是我创建的java配置,我无法弄清楚如何设置轮询器。这些例子只讨论全局轮询器,但我需要在适配器中设置轮询器。

@Bean 
    public MessageHandler kafkaMessageHandler() { 
    KafkaProducerMessageHandler<String, String> handler = 
     new KafkaProducerMessageHandler<>(kafkaTemplate()); 
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); 
    handler.setTopicExpression(new LiteralExpression("headers.kafka_topic")); 
    return handler; 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate() { 
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(producerConfigs())); 
    } 


    @Bean 
    public Map<String, Object> producerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); 
    // introduce a delay on the send to allow more messages to accumulate 
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
    return properties; 
    } 

的XML相当于我有如下:

<int-kafka:outbound-channel-adapter 
    id="kafkaOutboundChannelAdapter" 
    kafka-producer-context-ref="kafkaProducerContext" 
    channel="kafkaChannel" > 
    <int:poller fixed-rate="1000" max-messages-per-poll="10000}"/> 
</int-kafka:outbound-channel-adapter> 

回答

1

the documentation ......

@Bean 
@ServiceActivator(inputChannel = "kafkaChannel" poller = @Poller(fixedDelay = "1000", ...) 
public MessageHandler kafkaMessageHandler() { 
    ... 
} 
+0

感谢,正是我一直在寻找。 – Vidhya

相关问题