2017-10-13 179 views
0

我读了following doc: 不工作,并尝试运行代码:不调用弹簧卡夫卡例如

@SpringBootApplication 
public class Application implements CommandLineRunner { 

    public static Logger logger = LoggerFactory.getLogger(Application.class); 

    public static void main(String[] args) { 
     SpringApplication.run(Application.class, args).close(); 
    } 

    @Autowired 
    private KafkaTemplate<String, String> template; 

    private final CountDownLatch latch = new CountDownLatch(3); 

    @Override 
    public void run(String... args) throws Exception { 
     this.template.send("spring_kafka_topic", "foo1"); 
     this.template.send("spring_kafka_topic", "foo2"); 
     this.template.send("spring_kafka_topic", "foo3"); 
     latch.await(60, TimeUnit.SECONDS); 
     logger.info("All received"); 
    } 

    @KafkaListener(topics = "spring_kafka_topic") 
    public void listen(ConsumerRecord<?, ?> cr) throws Exception { 
     logger.info(cr.toString()); 
     latch.countDown(); 
    } 
} 

listen方法。

为什么?

P.S.我检查控制台,并确保该主题存在:

D:\work\kafka\kafka_2.11-0.11.0.1>bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 
__consumer_offsets 
myTopic 
my_topic 
new_topic 
part_2_example_1 
spring_kafka_topic 
test 

,但话题是空的:

D:\work\kafka\kafka_2.11-0.11.0.1>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic spring_kafka_topic --from-beginning 

D:\work\kafka\kafka_2.11-0.11.0.1> 

application.properties

​​
+0

日志说什么? – randominstanceOfLivingThing

+0

您是否已将'@ EnableKafka'添加到您的某个配置类中?你是否在某处定义了'KafkaListenerContainerFactory' bean? – Paulius

回答

0

我只是复制你的代码,它的工作对我来说,1.5.7的开机很好...

2017-10-13 12:13:13.379 INFO 3537 --- [ntainer#0-0-C-1] com.example.So46732065Application  : ConsumerRecord(topic = spring_kafka_topic, partition = 0, offset = 0, CreateTime = 1507909008962, checksum = 4047989513, serialized key size = -1, serialized value size = 4, key = null, value = foo1) 
2017-10-13 12:13:13.381 INFO 3537 --- [ntainer#0-0-C-1] com.example.So46732065Application  : ConsumerRecord(topic = spring_kafka_topic, partition = 0, offset = 1, CreateTime = 1507909008989, checksum = 4214835548, serialized key size = -1, serialized value size = 4, key = null, value = foo2) 
2017-10-13 12:13:13.381 INFO 3537 --- [ntainer#0-0-C-1] com.example.So46732065Application  : ConsumerRecord(topic = spring_kafka_topic, partition = 0, offset = 2, CreateTime = 1507909008989, checksum = 2352904650, serialized key size = -1, serialized value size = 4, key = null, value = foo3) 

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic spring_kafka_topic --from-beginning 
foo1 
foo2 
foo3 

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group foo 
GROUP       TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG    OWNER 
foo       spring_kafka_topic    0   3    3    0    consumer-1_/127.0.0.1 

我建议你打开DEBUG日志。

+0

我重新启动IDE,现在它可以工作,但控制台用户仍然不会显示任何内容 – gstackoverflow

+0

D:\ work \ kafka \ kafka_2.11-0.11.0.1> bin \ windows \ kafka-console-consumer.bat --zookepeer localhost:2181 -topic spring_kafka_topic --from-begining d:\工作\卡夫卡\ kafka_2.11-0.11.0.1> – gstackoverflow

+0

http://joxi.ru/12M1GQzfMbyea2.jpg – gstackoverflow