2016-03-08 146 views
2

我试图从卡夫卡的话题来读取(在Java中),但是这个例外总是推出:UnknownCodecException卡夫卡

kafka.common.UnknownCodecException: 3 is an unknown compression codec 
    at kafka.message.CompressionCodec$.getCompressionCodec(CompressionCodec.scala:26) 
    at kafka.message.Message.compressionCodec(Message.scala:213) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:173) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) 
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) 
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) 
    at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847) 
    at scala.collection.Iterator$$anon$19.skip(Iterator.scala:612) 
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:210) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745)  

对于消费者的属性是:

group.id=groupTest123 
consumer.id=consumerid 
client.id=clientid 
auto.offset.reset=smallest 

,为生产商:

acks=1 
buffer.memory=67108864 
compression.type=none 
batch.size=16384 
linger.ms=0 

任何想法?

谢谢!

+1

您正在使用错误的编解码器。你可以发布你的配置(生产者/消费者)吗? – Markon

+0

请用属性更新您的问题(生产者和消费者)。 – Markon

回答

1

压缩类型3代表LZ4压缩,显然不支持您使用的客户端。请参阅compression types以供参考。如果我没有记错的话,LZ4加入了0.8.2之类的东西。

我不确定是否有任何火花集成能够处理LZ4压缩Kafka有效载荷,所以我认为你可以尝试在制作方使用GZIP或Snappy压缩,如果可能的话。