2017-08-25 291 views
0

我正在尝试编写java api来创建kafka主题。我有Kafka版本0.11.0.0。我搜索堆栈溢出并尝试了相同的方式。但无论话题是否存在,它总是让我异常。使用java创建主题 - kafka版本> 0.10.0.0

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/errors/TopicExistsException 
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) 
at kafkaStream.Processor.CreateTopic.main(CreateTopic.java:65) 
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.errors.TopicExistsException 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

我尝试下面的代码:

  String topicName = "t5"; 
      String zookeeperHosts = "XXXX:2181,XXXX:2181"; 
      int sessionTimeOutInMs = 15 * 1000; 
      int connectionTimeOutInMs = 50 * 1000; 
      Properties topicConfig = new Properties(); 
      zkClient = new ZkClient("XXXX:2181,XXXX:2181", 
        sessionTimeOutInMs, 
        connectionTimeOutInMs, 
        ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new 
      ZkConnection(zookeeperHosts), false); 
      ZkUtils.apply(
        "XXXX:2181,XXXX:2181", 
        sessionTimeOutInMs, 
        connectionTimeOutInMs, 
        false); 
     //  AdminUtils.createTopic(zkUtils, topicName, numPartitions, 1, 
      topicConfig, RackAwareMode.Enforced$.MODULE$); 
      AdminUtils.createTopic(zkUtils, topicName, 2, 1, new 
      Properties(), RackAwareMode.Enforced$.MODULE$); 

Maven依赖 - >

 <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.11.0.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.11.0.0</version> 
     </dependency> 
+0

你是否添加了依赖api(kafka-特定版本的客户端)jar到类路径? –

+0

是的,我已经添加 – Megha

回答

1

由于您使用的是新版本0.11.0我建议使用新的管理客户端API(以下链接为一些文件:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html)。

使用Zookeeper进行此类操作在将来会推迟使用新的Admin Client API。

+0

感谢您的建议,我试着用AdminClient。我无法做到。你可以分享任何例子 – Megha

+0

你可以在这里找到一个非常简单的例子(你只需要kafka-clients依赖,请参阅相关的pom.xml):https://github.com/ppatierno/kafka-playground/blob/ master/src/main/java/org/apache/kafka/playground/CreateTopic.java – ppatierno

+0

非常感谢@ppatierno,我遇到了导入错误软件包的问题。非常感谢 – Megha