我想用一个生产者写JSON对象到多个主题。春季Kafka单一生产者的多个主题
以下代码正在做我想做的事情,但使用setDefaultTopic()
方法告知KafkaTemplate
应该向其发送消息的主题感觉不对。
如果我使用send(String topic, ? payload)
方法比StringJsonMessageConverter
将无法工作。
我的制片人:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
而且我的配置:
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
如何做到这一点适当的任何建议?
UPDATE
我改变了代码,这...
监制:
public void send(Message<?> message) {
kafka.send(message);
}
在我的控制器(这里我创建的消息对象);
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
MessageHeaders对象仍将包含id和时间戳。
它的工作。谢谢!我不知道'KafkaHeaders'类,'MessageHeaders'类没有提供该主题的静态字段。 –