2015-12-15 72 views
1

我正在使用Apache Camel从kafka主题使用消息,然后处理消息,在处理发生异常的情况下,我将该消息重定向到另一个kafka主题,并在单独的路线。所以我有一个类似于下面的路线。从kafka到错误的另一个kafka主题的Apache骆驼路由

from(“kafka1”)。process(“someProcessor”)。end(); (交换 - > {exchange.getIn()。setBody(“Message with error details”)})。(“kafka2”);

上面的代码实际上是在相同的kafka(kafka1)中发送错误消息。

我通过在onException进程中设置exchange.getIn()。setHeader(KafkaConstants.TOPIC,“kafka2”))来解决这个问题。这是预期的行为?为什么它会忽略kafka2并改用kafka1?骆驼的

1)版本使用 - 2.14.0

2)卡夫卡端点网址 -

消费者为

from("kafka:" + ("kafka.broker") + "?topic=" 
      + ("offer.kafka.topic") 
      + "&zookeeperHost=" + ("kafka.zookeeper.host") 
      + "&zookeeperPort=" + ("kafka.zookeeper.port") 
      + "&groupId=" + ("offer.kafka.group.id") 
      + "&consumerStreams=" + ("kafka.streams") 
      + "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals") 
      + "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout") 
      + "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries") 
      + "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms") 
      + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
      + "&autoOffsetReset=" + ("kafka.auto.offset.reset") 
      + "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes") 
      + "&socketReceiveBufferBytes=" + ("receive.buffer.bytes")) 
      .routeId("offerEventRoute").to("direct:offerEventRoute"); 

生产者 -

to("kafka:" + ("error.kafka.broker") + "?topic=" 
         + ("error.kafka.topic") 
         + "&zookeeperHost=" + ("error.kafka.zookeeper.host") 
         + "&zookeeperPort=" + ("error.kafka.zookeeper.port") 
         + "&groupId=" + ("error.kafka.group.id") 
         + "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout") 
         + "&rebalanceMaxRetries=" + ("rebalance.max.retries") 
         + "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms") 
         + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
         + "&autoOffsetReset=" + ("auto.offset.reset") 
         + "&messageSendMaxRetries=" + ("error.max.retries") 
         + "&serializerClass=kafka.serializer.StringEncoder" 
     ); 

回答

0

能否请您提供有关代码的更多细节,如

1)使用的骆驼版本

2)您的Kafka端点URL。

以任何机会,你在你的端点URL使用“bridgeEndpoint”属性..

+0

谢谢Himanshu,我更新了描述中的所有细节 – Amit

1

您需要设置bridgeEndPoint到真正在你的生产卡夫卡终点。否则,它会在交换标题中查找主题名称,并将其用作生产者的主题名称。

默认情况下它是假的。