我读了following doc: 不工作,并尝试运行代码:不调用弹簧卡夫卡例如
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("spring_kafka_topic", "foo1");
this.template.send("spring_kafka_topic", "foo2");
this.template.send("spring_kafka_topic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "spring_kafka_topic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
但listen
方法。
为什么?
P.S.我检查控制台,并确保该主题存在:
D:\work\kafka\kafka_2.11-0.11.0.1>bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
myTopic
my_topic
new_topic
part_2_example_1
spring_kafka_topic
test
,但话题是空的:
D:\work\kafka\kafka_2.11-0.11.0.1>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic spring_kafka_topic --from-beginning
D:\work\kafka\kafka_2.11-0.11.0.1>
application.properties:
日志说什么? – randominstanceOfLivingThing
您是否已将'@ EnableKafka'添加到您的某个配置类中?你是否在某处定义了'KafkaListenerContainerFactory' bean? – Paulius