2014-09-03 120 views
2

我试图测试与卡夫卡骆驼整合为解释hereApache的骆驼和卡夫卡整合

以下是我的代码

public class KafkaTest { 

    public static void main(String args[]) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 

     context.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("kafka:test?zkConnect=localhost:2181&metadataBrokerList=localhost:9092") 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }); 

     context.start(); 
     while (true) { 

     } 
    } 
} 

不过,我收到以下错误

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka:test?zkConnect=localhost:2181&... because of Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. 

Unknown parameters=[{metadataBrokerList=localhost:9092, zkConnect=localhost:2181}] 

请建议可能缺少的内容。

回答

1

您应该使用在official documentation中命名的正确参数名称。

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181") 

您引用的版本,在github的wiki中描述,是Apache的贡献,之后有所改变。

+0

有2骆驼卡夫卡组件漂浮在那里。 https://github.com/Giwi/camel-kafka => 3岁。使用OP正在使用的格式。 https://github.com/apache/camel/tree/master/components/camel-kafka => CURRENT使用此答案中表示的参数。 – Jeff 2015-09-11 14:36:34

0

使用端点类?

类似:

public static KafkaEndpoint endpoint(String host, String port, String topic, String offset, String groupId) { 
     String endpointUri = "kafka://" + host + ":" + port; 
     KafkaEndpoint endpoint = new DefaultCamelContext().getEndpoint(endpointUri, KafkaEndpoint.class); 
     endpoint.getConfiguration().setTopic(topic); 
     endpoint.getConfiguration().setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setAutoOffsetReset(offset); 
     endpoint.getConfiguration().setGroupId(groupId); 
     return endpoint; 
    } 

PollingConsumer consumer = endpoint.createPollingConsumer(); 

new RouteBuilder() { 
      public void configure() { 
       from(endpoint) 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }