2011-11-28 122 views
21

我试图用Avro来读取/写入Kafka的消息。有没有人有过使用Avro二进制编码器编码/解码将放在消息队列中的数据的例子?如何使用Avro二进制编码器对Kafka消息进行编码/解码?

我需要比卡夫卡部分更多的Avro部分。或者,也许我应该看看不同的解决方案?基本上,我试图找到一个更有效的JSON空间解决方案。刚刚提到Avro,因为它比JSON更紧凑。

回答

11

我终于想起要问卡夫卡邮件列表,并得到以下答案,完美的工作。

是的,你可以发送消息作为字节数组。如果你看一下Message类的构造函数 ,你会看到 -

高清这个(字节:数组[字节])

现在,看着生产者的send()API -

高清发送(producerData:ProducerData [K,V] *)

您可以将V设置为Message和K的类型,使其成为您想要的密钥。 如果您不关心使用密钥进行分区,请将其设置为消息 类型。

感谢, NEHA

2

相反的Avro的,你也可以简单地考虑压缩的数据;要么使用gzip(好的压缩,更高的cpu),要么使用LZF或Snappy(速度更快,压缩比较慢)。

或替代地也有Smile binary JSON,杰克逊(与this extension)在Java的支持:它是紧凑的二进制格式,并且更容易使用比Avro的使用方法:

ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
byte[] serialized = mapper.writeValueAsBytes(pojo); 
// or back 
SomeType pojo = mapper.readValue(serialized, SomeType.class); 

基本相同的代码与JSON,除了用于传递不同格式的工厂。 从数据大小的角度来看,Smile或Avro是否更紧凑取决于用例的细节;但两者都比JSON更紧凑。

好处是,这与JSON和微笑,相同的代码,只使用POJO快速工作。与需要代码生成的Avro相比,或者需要大量手动代码来打包和解压缩GenericRecord s。

7

如果你想从Avro的消息(卡夫卡部分已经回答)一个字节数组,使用二进制编码器:

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try { 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
    } finally { 
     os.close(); 
    } 
+0

你可以发送这个byteData到KafkaBroker并从控制台用户读取它吗?生产者关键序列化程序应该是什么? – user2441441

+0

正如在响应中提到的,kafka部分记录在其他响应中 - http://stackoverflow.com/a/8348264/5266和http://stackoverflow.com/a/32341917/5266 –

12

这是一个基本的例子。我没有尝试过多个分区/主题。

//样品生产者代码

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.*; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.avro.specific.SpecificDatumWriter; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import java.util.Properties; 


public class ProducerTest { 

    void producer(Schema schema) throws IOException { 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "0:9092"); 
     props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, byte[]> producer = new Producer<String, byte[]>(config); 
     GenericRecord payload1 = new GenericData.Record(schema); 
     //Step2 : Put data in that genericrecord object 
     payload1.put("desc", "'testdata'"); 
     //payload1.put("name", "अasa"); 
     payload1.put("name", "dbevent1"); 
     payload1.put("id", 111); 
     System.out.println("Original Message : "+ payload1); 
     //Step3 : Serialize the object to a bytearray 
     DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload1, encoder); 
     encoder.flush(); 
     out.close(); 

     byte[] serializedBytes = out.toByteArray(); 
     System.out.println("Sending message in bytes : " + serializedBytes); 
     //String serializedHex = Hex.encodeHexString(serializedBytes); 
     //System.out.println("Serialized Hex String : " + serializedHex); 
     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes); 
     producer.send(message); 
     producer.close(); 

    } 


    public static void main(String[] args) throws IOException, DecoderException { 
     ProducerTest test = new ProducerTest(); 
     Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
     test.producer(schema); 
    } 
} 

//示例代码消费

第1部分:消费群代码:你可以比多消费者的多个分区/主题的更多。

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* Created by on 9/1/15. 
*/ 
public class ConsumerGroupExample { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){ 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId)); 
     this.topic = a_topic; 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 

     return new ConsumerConfig(props); 
    } 

    public void shutdown(){ 
     if (consumer!=null) consumer.shutdown(); 
     if (executor!=null) executor.shutdown(); 
     System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     try{ 
      if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){ 

      } 
     }catch(InterruptedException e){ 
      System.out.println("Interrupted"); 
     } 

    } 


    public void run(int a_numThreads){ 
     //Make a map of topic as key and no. of threads for that topic 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     //Create message streams for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     //initialize thread pool 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     //start consuming from thread 
     int threadNumber = 0; 
     for (final KafkaStream stream : streams) { 
      executor.submit(new ConsumerTest(stream, threadNumber)); 
      threadNumber++; 
     } 
    } 
    public static void main(String[] args) { 
     String zooKeeper = args[0]; 
     String groupId = args[1]; 
     String topic = args[2]; 
     int threads = Integer.parseInt(args[3]); 

     ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
     example.run(threads); 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 


} 

第2部分:实际消费消息的个人消费者。

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.generic.IndexedRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.Decoder; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.commons.codec.binary.Hex; 

import java.io.File; 
import java.io.IOException; 

public class ConsumerTest implements Runnable{ 

    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run(){ 
     ConsumerIterator<byte[], byte[]>it = m_stream.iterator(); 
     while(it.hasNext()) 
     { 
      try { 
       //System.out.println("Encoded Message received : " + message_received); 
       //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray()); 
       //System.out.println("Deserializied Byte array : " + input); 
       byte[] received_message = it.next().message(); 
       System.out.println(received_message); 
       Schema schema = null; 
       schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
       DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); 
       GenericRecord payload2 = null; 
       payload2 = reader.read(null, decoder); 
       System.out.println("Message received : " + payload2); 
      }catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
      } 
     } 

    } 


} 

测试AVRO模式:

{ 
    "namespace": "xyz.test", 
    "type": "record", 
    "name": "payload", 
    "fields":[ 
     { 
      "name": "name", "type": "string" 
     }, 
     { 
      "name": "id", "type": ["int", "null"] 
     }, 
     { 
      "name": "desc", "type": ["string", "null"] 
     } 
    ] 
} 

重要的事情需要注意的是:

  1. youll需要标准卡夫卡和Avro公司罐子运行这段代码的开箱。

  2. 是非常重要的props.put(“serializer.class”,“kafka.serializer.DefaultEncoder”); 唐t use stringEncoder as that won如果您发送一个字节数组作为消息t工作。

  3. 您可以将byte []转换为十六进制字符串,然后将消费者reconvert十六进制字符串发送到byte [],然后发送到原始消息。

  4. 运行动物园管理员和经纪人如上所述: - http://kafka.apache.org/documentation.html#quickstart并创建一个名为“page_views”或任何你想要的。

  5. 运行ProducerTest.java,然后运行ConsumerGroupExample.java并查看正在生成和使用的avro数据。

+0

感谢您的帮助! !我尝试过,但在消费者代码中,我的it.hasNext()函数返回false,因此控件永远不会进入while循环。有什么想法我可以做错什么? –

3

已更新的答案。

卡夫卡具有与Maven(SBT格式)的Avro的串行器/解串器坐标:

"io.confluent" % "kafka-avro-serializer" % "3.0.0" 

您传递KafkaAvroSerializer的一个实例到KafkaProducer构造。

然后,您可以创建Avro GenericRecord实例,并将这些实例用作Kafka ProducerRecord实例内的值,您可以使用这些值通过KafkaProducer发送。

在Kafka消费者方面,您使用KafkaAvroDeserializer和KafkaConsumer。

+0

你能提供一个简短但完整的例子吗? –

+1

这只适用于Confluent自己添加的Maven仓库,因为它们不会将工件发布到maven central:http://packages.confluent.io/maven –