2017-06-19 61 views
2

工作,如何使无功卡夫卡(https://github.com/akka/reactive-kafka)与合流模式注册的Avro模式工作?下面是我的示例代码:使反应卡夫卡与合流模式注册的Avro模式

def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = { 
 
    
 
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer) 
 
     .withBootstrapServers(bootstrap) 
 
     .withGroupId(groupId) 
 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
 
     .withProperty("schema.registry.url", schemaRegistryUrl) 
 
     .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName) 
 
     .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName) 
 
     .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") 
 
    
 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic)) 
 
}

回答

1

你只需要配置消费者使用汇合解串器:io.confluent.kafka.serializers.KafkaAvroDeserializer.class

设置以下属性:

  • ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG
  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

此外,你必须以至少指定一个地址指向您的架构注册表实例添加属性“schema.registry.url”。

最后,您有以下dependecy添加到您的项目:

  <dependency> 
      <groupId>io.confluent</groupId> 
      <artifactId>kafka-avro-serializer</artifactId> 
      <version>${io.confluent.version}</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>slf4j-log4j12</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

Confluent Documentation

+0

谢谢,我知道如何消费与架构注册表卡夫卡Avro的消息时,我问的问题是如何使用它与反应性卡夫卡。它需要先构建consumerSettings,就像我上面粘贴的代码一样。我不知道是否有可能得到源[ConsumerMessage.CommittableMessage [字符串,AvroClass]直接因为KafkaAvroDeseriablizer是不通用的。否则,我得到CommittableMessage [String,Object],然后将其转换为特定的? –

+0

嗨,你能弄清楚这一点。我有通过akka流kafka反序列化kafka消息的确切问题。它返回一个我无法投射到我的类型的对象。 – armulator

+0

看来这个问题并没有被修正,并被其他人报告。 https://github.com/akka/reactive-kafka/issues/342。 –