2016-02-02 41 views

回答

0

Flume的Kafka频道不支持将事件标头映射到像KafkaSink那样开箱即用的分区键。

但是,修改它并不太复杂。由于我不知道我可以共享代码,我只是发出指示:

  1. 添加配置密钥将被映射到分区内部类KafkaTransaction关键
  2. 头的名称,替换byte[]在构件serializedEvents的东西的类型也可以容纳每一个的String键和每一个事件(或者一个内部类,或甚至是卡夫卡KeyedMessage<String, byte[]>
  3. 在方法KafkaTransaction.doPut(Event event),用序列检索serializedEvents从标题密钥和存储在一起消息
  4. 在方法KafkaTransaction.doCommit()中,使用存储有序列化事件的密钥而不是batchUUID

注意,在一个交易事项将不再保证由单个KafkaChannel实例在通道的消费端部处理,所以你必须检查它与您的使用案例兼容(关于交易规模等)。

1

通道不必担心分区。因为频道是写它的频道,而频道正在消费讯息,所以不需要分割讯息。这就是如何通过flume-kafka通道创建消息来写作的。

new KeyedMessage<String, byte[]>(topic.get(), null, 
       batchUUID, event) 

但是,如果您的主题有多个分区,那么缺少密钥会导致消息被喷入可用分区。

如果您想要更多地控制消息如何在分区中分布,那么您可能需要查看Kafka的自定义分区程序的概念,以便您可以创建一个实现org.apache.kafka.clients.producer.Partitioner接口的类,并将partitioner.class属性的值设置为等于您的类的名称并确保您的自定义分区程序在您的类路径中可用。这样,您可以在发布前获得对每条消息的控制权,并且可以决定消息应该发送到哪个分区。您可以在您的flume频道配置中设置属性kafka.partitioner.class,以便获取

+0

那么,我**有其他消费者群体,除了Flume之外,对于所讨论的话题,所以我关心分区。我希望做的是将一个头(源主机)映射到密钥。我对代码的理解是,KeyedMessage <...>构造函数的'key'参数始终为空,因此消息总是被喷入可用分区。是对的吗? – Shadocko

+1

是的,这是正确的信息被喷洒。您可能想要尝试的一种解决方案是Kafka具有自定义分区程序的概念,因此您可以创建类实现接口并设置分区程序。类属性的值等于你的类的名称,并确保你的自定义分区器在类路径中可用。这样,您可以在发布之前获得对每条消息的控制权,并且您可以确定该消息应该到哪个分区。你可以在你的flume频道配置中设置属性kafka.partitioner.class,这样它就会被接收到 –

+0

谢谢,我们会研究你的建议。您应该编辑您的答案以包含'kafka.partitioner.class'技巧,因为它是解决方案的一部分。 – Shadocko

相关问题