我正在将Play v 2.3.4应用程序迁移到Play v 2.5.4。一路上,我不得不升级到Scala 2.11.8和kafka 9.0+以支持更新的Play版本。由于无效的ZKStringSerializer引用而无法Instanciate ZkClient参考
我已经解决了大部分问题,但是我找不到一些通过AdminUtils管理Kafka主题的代码的卡夫卡问题。麻烦都集中在kafka.utils.ZkStringSerialzier周围。
我正在使用org.I0Itec.zkclient包实例ZkClient对象,它是在ZkUtils对象的构造中传递的,但它因为无法解析我的ZkStringSerializer而失败。
相关的代码是:在ZKStringSerializer是从他处人迹罕至的错误
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
上面的代码的结果。
我发现了几个相关的职位,以创建主题(主要在Java和卡夫卡9.0之前) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API 最后 Creating a Kafka topic results in no leader
基于这些我通过代码更新如下:
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer$
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
然后我只是无法解决符号ZkStringSerialzer $错误。
我尝试了org.I0Itec.zkclient.serialize.ZkSerializer对象,它没有什么区别。
所以我的问题实际上是两方面的: 1.在scala中导入和声明语句的'$'字符有什么意义。我已经在字符串插值中使用它(e/g/s“var value是$ var”)来引用变量,但是这看起来不同。 2.我的代码有什么问题。这是我导入,声明,还是别的吗?
我是新来斯卡拉和播放,但我感觉像很白痴的时刻,因此任何建议/帮助表示赞赏
〜戴夫
附: 在情况下,它可以帮助相关的位由项目文件
build.sbt:
lazy val `api` = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
jdbc,
cache,
ws,
specs2 % Test
)
plugins.sbt:
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4")
addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0")
build.properties:
sbt.version=0.13.5
您所做的与创建zkUtils一样,是用'false'替换'ZkStringSerializer'。这是如何运作的? – user2418202
@ user2418202我原来的问题是试图实例化ZkSerializer传递给I0Itec ZkClient。我停止尝试使用org.I0Itec.zkclient包创建一个ZKClient,并让它直接由Kafka ZkUtils处理。 – DVS