2
工作,如何使无功卡夫卡(https://github.com/akka/reactive-kafka)与合流模式注册的Avro模式工作?下面是我的示例代码:使反应卡夫卡与合流模式注册的Avro模式
def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrap)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("schema.registry.url", schemaRegistryUrl)
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}
谢谢,我知道如何消费与架构注册表卡夫卡Avro的消息时,我问的问题是如何使用它与反应性卡夫卡。它需要先构建consumerSettings,就像我上面粘贴的代码一样。我不知道是否有可能得到源[ConsumerMessage.CommittableMessage [字符串,AvroClass]直接因为KafkaAvroDeseriablizer是不通用的。否则,我得到CommittableMessage [String,Object],然后将其转换为特定的? –
嗨,你能弄清楚这一点。我有通过akka流kafka反序列化kafka消息的确切问题。它返回一个我无法投射到我的类型的对象。 – armulator
看来这个问题并没有被修正,并被其他人报告。 https://github.com/akka/reactive-kafka/issues/342。 –