2017-09-06 176 views
2

我想用一个生产者写JSON对象到多个主题。春季Kafka单一生产者的多个主题

以下代码正在做我想做的事情,但使用setDefaultTopic()方法告知KafkaTemplate应该向其发送消息的主题感觉不对。

如果我使用send(String topic, ? payload)方法比StringJsonMessageConverter将无法​​工作。

我的制片人:

public class MyProducer { 

    @Autowired 
    private KafkaTemplate<String, ?> kafka; 

    public void send(String topic, Message<?> message) { 
     kafka.setDefaultTopic(topic); 
     kafka.send(message); 
    } 
} 

而且我的配置:

@Configuration 
public class MyProducerConfig { 

    @Bean 
    public ProducerFactory<String, String> producerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     ... 

     return new DefaultKafkaProducerFactory<>(props); 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { 
     KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); 
     kafkaTemplate.setMessageConverter(new StringJsonMessageConverter()); 

     return kafkaTemplate; 
    } 
} 

如何做到这一点适当的任何建议?

UPDATE

我改变了代码,这...

监制:

public void send(Message<?> message) { 
    kafka.send(message); 
} 

在我的控制器(这里我创建的消息对象);

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName")); 
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers); 
producer.send(genericMessage); 

MessageHeaders对象仍将包含id和时间戳。

回答

2

当使用send(Message<?>)变种,消息变换预计的话题会在邮件标题...

String topic = headers.get(KafkaHeaders.TOPIC, String.class); 

如果您可以通过其他方式确定主题,您需要创建一个自定义转换器。

更改默认主题不是线程安全的。

+0

它的工作。谢谢!我不知道'KafkaHeaders'类,'MessageHeaders'类没有提供该主题的静态字段。 –

2

你必须调用模板前手动使用StringJsonMessageConverter

ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic); 
kafka.send(producerRecord); 
+0

这由模板内部执行;看到我的答案。 –

相关问题