2016-07-28 154 views
0

我正尝试使用卡夫卡HDFS连接器将数据以avro格式从我的Java代码写入卡夫卡到HDFS,我收到了一些问题。当我使用所提供的融合平台网站上的简单架构和数据,我可以写数据到HDFS,但是当我尝试使用复杂的Avro的模式,我得到这个错误在HDFS连接器日志:使用卡夫卡HDFS连接写入HDFS时出错

ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) 
org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

我使用融合平台3.0.0

我的Java代码:

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put("schema.registry.url", <url>); 
// Set any other properties 
KafkaProducer producer = new KafkaProducer(props); 

Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc")); 
DatumReader<Object> reader = new GenericDatumReader<Object>(schema); 

InputStream input = new FileInputStream("json/data.json"); 
DataInputStream din = new DataInputStream(input); 
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); 

Object datum = null; 
while (true) { 
    try { 
     datum = reader.read(null, decoder); 
    } catch (EOFException e) { 
     break; 
    } 
} 

ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum); 
producer.send(message); 
producer.close(); 

的模式(这是从AVDL文件中创建):

{ 
    "type" : "record", 
    "name" : "RiskMeasureEvent", 
    "namespace" : "risk", 
    "fields" : [ { 
    "name" : "info", 
    "type" : { 
     "type" : "record", 
     "name" : "RiskMeasureInfo", 
     "fields" : [ { 
     "name" : "source", 
     "type" : { 
      "type" : "record", 
      "name" : "Source", 
      "fields" : [ { 
      "name" : "app", 
      "type" : { 
       "type" : "record", 
       "name" : "Application", 
       "fields" : [ { 
       "name" : "csi_id", 
       "type" : "string" 
       }, { 
       "name" : "name", 
       "type" : "string" 
       } ] 
      } 
      }, { 
      "name" : "env", 
      "type" : { 
       "type" : "record", 
       "name" : "Environment", 
       "fields" : [ { 
       "name" : "value", 
       "type" : [ { 
        "type" : "enum", 
        "name" : "EnvironmentConstants", 
        "symbols" : [ "DEV", "UAT", "PROD" ] 
       }, "string" ] 
       } ] 
      } 
      }, ... 

JSON文件:

{ 
    "info": { 
    "source": { 
     "app": { 
     "csi_id": "123", 
     "name": "ABC" 
     }, 
     "env": { 
     "value": { 
      "risk.EnvironmentConstants": "PROD" 
     } 
     }, ... 

这似乎是模式的问题,但我不能确定问题。

回答

1

我是Confluent的工程师。这是Avro Converter如何处理env的联合模式的一个错误。我创建了issue-393来解决这个问题。我还修复了一个pull request。这应该很快就会合并。

J

+0

嗨,杰里米,感谢您的解决。我已经从分支下载了模式注册表的最新代码。因为它并未包含在confluent包中,所以我下载了apache kafka和kafka-hdfs-connect的代码,并在本地构建它们。在尝试运行hdfs连接器时,尝试加载AvroConverter文件(位于模式注册表中)时出现错误。我可否知道如何配置连接器以便能够找到该jar? – iiSGii