2016-08-23 2531 views
4

我每天都会生成数千个文件,我想用Kafka进行流式处理。 当我尝试读取文件时,每行都被视为单独的消息。是否可以使用Kafka传输文件?

我想知道如何将每个文件的内容作为卡夫卡主题中的单个消息并与消费者如何在单独的文件中编写来自卡夫卡主题的每条消息。

+0

你有看过Kafka Connect吗? http://docs.confluent.io/3.0.0/connect/index.html –

+0

是的,我意识到这一点。我如何在这里使用它?该场景是当我读取文件时,每一行都被视为一个单独的消息,但我希望每个文件都是一个长单个消息。 (文件可能有30-40行) – Nahush

+0

您使用Java客户端,控制台制作者,其他? –

回答

3

您可以编写自己的串行器/解串器来处理文件。 例如:

制片道具:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, YOUR_FILE_SERIALIZER_URI); 

消费者道具:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YOUR_FILE_DESERIALIZER_URI); 

串行

public class FileMapSerializer implements Serializer<Map<?,?>> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map configs, boolean isKey) { 
} 

@Override 
public byte[] serialize(String topic, Map data) { 
    ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
    ObjectOutput out = null; 
    byte[] bytes = null; 
    try { 
     out = new ObjectOutputStream(bos); 
     out.writeObject(data); 
     bytes = bos.toByteArray(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (out != null) { 
       out.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
     try { 
      bos.close(); 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return bytes; 
} 
} 

解串器

public class MapDeserializer implements Deserializer<Map> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map config, boolean isKey) { 

} 

@Override 
public Map deserialize(String topic, byte[] message) { 
    ByteArrayInputStream bis = new ByteArrayInputStream(message); 
    ObjectInput in = null; 
    try { 
     in = new ObjectInputStream(bis); 
     Object o = in.readObject(); 
     if (o instanceof Map) { 
      return (Map) o; 
     } else 
      return new HashMap<String, String>(); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      bis.close(); 
     } catch (IOException ex) { 
     } 
     try { 
      if (in != null) { 
       in.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return new HashMap<String, String>(); 
} 
} 

在以下形式

final Object kafkaMessage = new ProducerRecord<String, Map>((String) <TOPIC>,Integer.toString(messageId++), messageMap); 

messageMap撰写消息将包含文件名作为密钥和文件内容的值。 值可以是可序列化的对象。 因此,每条消息都将包含一个具有File_Name与FileContent映射的映射。可以是单个值或多个值。

相关问题