2016-11-21 102 views
0

使用JSON生成/使用来自Kafka的卡。使用下面的属性保存到HDFS在JSON:使用JsonConverter的Kafka Connect HDS Sink for JSON格式

key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 

监制:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ 
     --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json" 

消费者:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties 

问题1:

key.converter.schemas.enable=true 

value.converter.schemas.enable=true 

获取异常:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332) 

问题2:

启用上述两个属性不抛出任何问题,但没有数据写入了HDFS。

任何建议将高度赞赏。

感谢

回答

2

转换器指的是如何将数据从卡夫卡的话题被翻译成由连接器进行解释,并写入到HDFS。 HDFS连接器仅支持将HDFS写入HDFS或实木复合地板。您可以找到有关如何将格式扩展为JSON here的信息。如果你做了这样的扩展,我鼓励你将它贡献给连接器的开源项目。

+0

感谢您的建议! –

+0

@dawsaw你知道这样的扩展是否可以使用本地kafka connect api实现? –

+0

有一个已经与Kafka一起出货的JsonConverter。我认为这里的问题是特定于HDFS连接器的输出格式,这必然意味着扩展连接器,如果我已经正确理解了您的问题,则本身不会在Connect本身做任何事情。 – dawsaw

0

对于输入JSON格式的消息被写入到HDFS,请设置以下属性

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 
+0

将检查Akshat。谢谢你的评论 –

相关问题