2016-08-05 110 views
0

我正在将Play v 2.3.4应用程序迁移到Play v 2.5.4。一路上,我不得不升级到Scala 2.11.8和kafka 9.0+以支持更新的Play版本。由于无效的ZKStringSerializer引用而无法Instanciate ZkClient参考

我已经解决了大部分问题,但是我找不到一些通过AdminUtils管理Kafka主题的代码的卡夫卡问题。麻烦都集中在kafka.utils.ZkStringSerialzier周围。

我正在使用org.I0Itec.zkclient包实例ZkClient对象,它是在ZkUtils对象的构造中传递的,但它因为无法解析我的ZkStringSerializer而失败。

相关的代码是:在ZKStringSerializer是从他处人迹罕至的错误

import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.{ZkClient, ZkConnection} 
object Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
     var zkSerializer: ZKStringSerializer = ZKStringSerializer 
     val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer) 
     val topicConfig: Properties = new Properties() 
     val isSecureKafkaCluster: Boolean = false 

     val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster) 

     AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig) 
     zkClient.close() 
    } 
} 

上面的代码的结果。

我发现了几个相关的职位,以创建主题(主要在Java和卡夫卡9.0之前) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API 最后 Creating a Kafka topic results in no leader

基于这些我通过代码更新如下:

import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 
import kafka.utils.ZKStringSerializer$ 
import org.I0Itec.zkclient.{ZkClient, ZkConnection} 
object Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
     var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$ 
     val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer) 
     val topicConfig: Properties = new Properties() 
     val isSecureKafkaCluster: Boolean = false 

     val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster) 

     AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig) 
     zkClient.close() 
    } 
} 

然后我只是无法解决符号ZkStringSerialzer $错误。

我尝试了org.I0Itec.zkclient.serialize.ZkSerializer对象,它没有什么区别。

所以我的问题实际上是两方面的: 1.在scala中导入和声明语句的'$'字符有什么意义。我已经在字符串插值中使用它(e/g/s“var value是$ var”)来引用变量,但是这看起来不同。 2.我的代码有什么问题。这是我导入,声明,还是别的吗?

我是新来斯卡拉和播放,但我感觉像很白痴的时刻,因此任何建议/帮助表示赞赏

〜戴夫

附: 在情况下,它可以帮助相关的位由项目文件

build.sbt:

lazy val `api` = (project in file(".")).enablePlugins(PlayScala) 
scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "org.apache.kafka" % "kafka_2.11" % "0.9.0.1", 
    jdbc, 
    cache, 
    ws, 
    specs2 % Test 
) 

plugins.sbt:

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" 

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4") 

addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1") 

addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1") 

addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0") 

build.properties:

sbt.version=0.13.5 

回答

0

战斗之后我放弃了之前使用过的ZKClient软件包,并且简化了这个问题e直接使用了Kafka,这实际上比使用I0Itech ZKClient更清洁。

新的实现是这样的:

import java.util.Properties 
import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 

class Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
    if (ListKafkaTopics(zookeeperHosts).contains(topic)) { 
     return false 
    } 
    val zkUtils = ZkUtils.apply(zookeeperHosts, sessionTimeoutMs, connectionTimeoutMs, false) 
    AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, new Properties()) 
    zkUtils.close() 
    true 
    } 
} 

末结束取出的依赖,使清洁代码,这样一个双赢的,我想。

〜Dave

+0

您所做的与创建zkUtils一样,是用'false'替换'ZkStringSerializer'。这是如何运作的? – user2418202

+0

@ user2418202我原来的问题是试图实例化ZkSerializer传递给I0Itec ZkClient。我停止尝试使用org.I0Itec.zkclient包创建一个ZKClient,并让它直接由Kafka ZkUtils处理。 – DVS