可以使用--list (List all available topics)
选项kafka-topics.sh
,看看是否存在topics
阵列中self.topic
,如下图所示。
根据您使用这种方法的主题数量可能会有点沉重。如果是这种情况,您可以使用--describe (List details for the given topics)
,如果该主题不存在,可能将返回空白。我还没有彻底地测试过这个,所以我不能肯定地说这个解决方案有多坚实(--describe
),但它可能值得您进一步研究。
wanted_topics = ['host_updates_queue', 'foo_bar']
topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--list',
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
for wanted in wanted_topics:
if wanted in topics:
print '\'{}\' topic exists!'.format(wanted)
else:
print '\'{}\' topic does NOT exist!'.format(wanted)
topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--describe',
'--topic', wanted,
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
if not topic_desc:
print 'No description found for the topic \'{}\''.format(wanted)
OUTPUT:
[email protected]:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'
还有可用Broker Configuration所以您不必采取任何这些步骤:
auto.create.topics.enable | true |在服务器上启用自动创建主题。如果设置为true,则尝试为不存在的主题生成数据或获取元数据将自动使用默认复制因子和分区数创建它。
如果可能,我会采取这种方法。
请注意,您应该在您的代理上设置num.partitions
和default.replication.factor
的主题配置(server.properties
)以匹配您的代码段中的设置。
是的,一个直接的解决方案。谢谢。 – ThS
@ThS意外地引用了错误的Broker配置。我修复了帖子引用'auto.create.topics.enable',正确的配置来完成这个。请注意,默认值为'true',所以你可能不需要做任何事情:) – chrsblck