2017-08-11 167 views
2

我是Kafka和Avro的noob。所以我一直试图让生产者/消费者运行。到目前为止,我已经能够生产和消费简单的字节和字符串,使用下列内容: 配置为制片人:KafkaAvroSerializer无需schema.registry.url序列化Avro

Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 

    Schema.Parser parser = new Schema.Parser(); 
    Schema schema = parser.parse(USER_SCHEMA); 
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); 

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); 

    for (int i = 0; i < 1000; i++) { 
     GenericData.Record avroRecord = new GenericData.Record(schema); 
     avroRecord.put("str1", "Str 1-" + i); 
     avroRecord.put("str2", "Str 2-" + i); 
     avroRecord.put("int1", i); 

     byte[] bytes = recordInjection.apply(avroRecord); 

     ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes); 
     producer.send(record); 
     Thread.sleep(250); 
    } 
    producer.close(); 
} 

现在,这一切都很好,问题就来了,当我试图序列一个POJO。 因此,我可以使用Avro提供的实用程序从POJO获取AvroSchema。 硬编码的模式,然后试图建立一个通用的记录通过KafkaProducer 生产者现在设置为发送:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer"); 

Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema 
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); 

这就是问题的所在:我用KafkaAvroSerializer,生产者没有按瞬间“T拿出因: 缺少强制参数:schema.registry.url

我为什么这是必需的阅读起来,让自己的消费者能够无论生产者发送给我破译。 但是,AvroMessage中没有已经嵌入的模式吗? 将是真正伟大的,如果有人可以共享使用KafkaProducer与KafkaAvroSerializer的工作示例,而无需指定schema.registry.url

也将真正体会到在架构注册表的效用任何见解/资源。

谢谢!

+1

你试过了吗[spring-kafka avro deserializer](https://github.com) /code-not-found/spring-kafka/blob/master/spring-kafka-avro/src/main/java/com/codenotfound/kafka/serializer/AvroDeserializer.java)? [这里](https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html)一个教程。 –

回答

6

请注意:KafkaAvroSerializer不在香草apache kafka中提供 - 它由Confluent Platform提供。 (https://www.confluent.io/),作为其开源组件的一部分(http://docs.confluent.io/current/platform.html#confluent-schema-registry

快速回答:不,如果您使用KafkaAvroSerializer,则需要架构注册表。在这里看到一些示例: http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

模式注册表的基本思想是每个主题都会引用一个avro模式(即,您将只能发送彼此相关的数据,但模式可以有多个版本,所以你仍然需要确定每条记录的模式)

我们不想像你暗示的那样为每个数据编写模式 - 通常,模式比数据更大!这会浪费时间解析它,并且浪费资源(网络,磁盘,CPU)

相反,模式注册表实例将执行绑定avro schema <-> int schemaId,然后序列化程序将只写这个id从注册表中获取数据(并将其缓存以备后用)。

所以在kafka中,你的记录将是[<id> <bytesavro>](和技术原因的魔术字节),这是一个只有5字节的开销(与你的模式的大小相比) 而且在阅读时,你的消费者会发现与id相对应的模式,以及与其有关的解串器avro字节。你可以在融合的文档中找到更多的方法

如果你确实有一个用来为每个记录编写模式的地方,你需要一个其他的序列化器(我认为你自己写了,但它很容易,只是重用https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java并删除架构注册表部分以将其替换为架构,读取操作相同)。但是如果你使用avro,我真的会阻止这一点 - 一天之后,你需要实现类似avro注册表来管理版本管理

相关问题