2015-06-19 128 views
4

如果主题尚不存在,我想创建一个Kafka主题。我知道如何通过bash创建主题,但我不知道如何检查它是否存在。检查Python中是否存在Kafka主题

topic_exists = ?????? 
if not topic_exists: 
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'), 
     '--create', 
     '--zookeeper', '{}:2181'.format(KAFKAHOST), 
     '--topic', str(self.topic), 
     '--partitions', str(self.partitions), 
     '--replication-factor', str(self.replication_factor)]) 

回答

5

可以使用--list (List all available topics)选项kafka-topics.sh,看看是否存在topics阵列中self.topic,如下图所示。

根据您使用这种方法的主题数量可能会有点沉重。如果是这种情况,您可以使用--describe (List details for the given topics),如果该主题不存在,可能将返回空白。我还没有彻底地测试过这个,所以我不能肯定地说这个解决方案有多坚实(--describe),但它可能值得您进一步研究。

wanted_topics = ['host_updates_queue', 'foo_bar'] 

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'), 
     '--list', 
     '--zookeeper', '{}:2181'.format(KAFKAHOST)]) 

for wanted in wanted_topics: 
    if wanted in topics: 
     print '\'{}\' topic exists!'.format(wanted) 
    else: 
     print '\'{}\' topic does NOT exist!'.format(wanted) 

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'), 
     '--describe', 
     '--topic', wanted, 
     '--zookeeper', '{}:2181'.format(KAFKAHOST)]) 

    if not topic_desc: 
     print 'No description found for the topic \'{}\''.format(wanted) 

OUTPUT:

[email protected]:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py 
'host_updates_queue' topic exists! 
'foo_bar' topic does NOT exist! 
No description found for the topic 'foo_bar' 

还有可用Broker Configuration所以您不必采取任何这些步骤:

auto.create.topics.enable | true |在服务器上启用自动创建主题。如果设置为true,则尝试为不存在的主题生成数据或获取元数据将自动使用默认复制因子和分区数创建它。

如果可能,我会采取这种方法。

请注意,您应该在您的代理上设置num.partitionsdefault.replication.factor的主题配置(server.properties)以匹配您的代码段中的设置。

+0

是的,一个直接的解决方案。谢谢。 – ThS

+0

@ThS意外地引用了错误的Broker配置。我修复了帖子引用'auto.create.topics.enable',正确的配置来完成这个。请注意,默认值为'true',所以你可能不需要做任何事情:) – chrsblck

2

另一个好方法是用Python的卡夫卡模块:主题

kafka_client = kafka.KafkaClient(kafka_server_name) 
server_topics = kafka_client.topic_partitions 

if topic_name in server_topics: 
    your code.... 

kafka_client.topic_partitions返回列表。

+0

一直在寻找一种方式来做这个looong时间。从文档中看不出来,imo。 – sshow

+0

@sshow请在这里写下你正在寻找的东西,这样下一个有你问题的人会更快地找到它:) –

+1

这正是我一直在寻找的东西。如果主题不存在,我需要一种提前失败的方式,因为当消费者开始轮询不存在的主题时,由于当前(kafka-python 1.3.5)消费者由于无限循环而消耗100%的CPU。 – sshow