使用JSON生成/使用来自Kafka的卡。使用下面的属性保存到HDFS在JSON:使用JsonConverter的Kafka Connect HDS Sink for JSON格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
监制:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
--data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"
消费者:
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties
问题1:
key.converter.schemas.enable=true
value.converter.schemas.enable=true
获取异常:
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
问题2:
启用上述两个属性不抛出任何问题,但没有数据写入了HDFS。
任何建议将高度赞赏。
感谢
感谢您的建议! –
@dawsaw你知道这样的扩展是否可以使用本地kafka connect api实现? –
有一个已经与Kafka一起出货的JsonConverter。我认为这里的问题是特定于HDFS连接器的输出格式,这必然意味着扩展连接器,如果我已经正确理解了您的问题,则本身不会在Connect本身做任何事情。 – dawsaw