2016-11-28 123 views
0

我正在尝试骆驼卡夫卡集成。
我有两个队列:
queue1queue2骆驼卡夫卡集成问题

有三种途径:

  1. ROUTE1放入queue1两个消息的列表(它应该这样做只能一次)。
  2. Route2到读取queue1列表,将其分解,并提出个人信息在queue2
  3. 路径3读取queue2消息,只是打印了。

的代码如下:

import java.util.ArrayList; 
import java.util.List; 

import org.apache.camel.CamelContext; 
import org.apache.camel.Exchange; 
import org.apache.camel.Processor; 
import org.apache.camel.builder.RouteBuilder; 
import org.apache.camel.impl.DefaultCamelContext; 

public class CamelListTest { 
    public static void main(String[] args) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 
     context.addRoutes(new CamelListRoute()); 
     context.start(); 
     Thread.sleep(30000); 
     context.stop(); 
    } 
} 

class CamelListRoute extends RouteBuilder { 
    @Override 
    public void configure() throws Exception { 


     //Route1, expected to run once 
     from("timer://timerName?repeatCount=1").process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       List<String> inOrderList = new ArrayList<String>(); 
       inOrderList.add("1"); 
       inOrderList.add("2"); 
       exchange.getIn().setBody(inOrderList, ArrayList.class); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue1"); 


     //Route2 
     from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .split() 
     .body().process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("2nd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue2"); 


     //Route3 
     from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("3rd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }); 
    } 
} 

它不按预期工作,并有观察到几个问题:

  1. 第一条路线,预计将只运行一次(repeatCount = 1),连续运行,一次又一次地将相同的消息放入queue1
  2. 第二条路线读取queue1消息,将其分解,但不把它放在queue2
  3. 由于第二路由没有放任何东西queue2,这条路线没有得到任何消息。

谁能帮我弄清楚这里有什么问题吗?

回答

0

我看到几件事情:

  1. 我希望你给卡夫卡网址是这样的: “卡夫卡://本地主机:9092主题=队列1”

注:卡夫卡: //

  1. 为消费者提供zookeeper网址例如:kafka://?话题=队列1 & zookeeperConnect = & consumerStreams = 1 &的groupId =测试& autoOffsetReset = 最大

  2. 注意在以前的点autoOffsetReset值将最大最小代替最新

+0

是的,我正确地使用了网址。使用exchange.getOut()。setBody()将数据传递到下一个路由后,问题得到解决。 – rvd

+0

Ohk,我想你可以发表答案。这对其他人会有帮助。谢谢。 –

0

我想你应该交换消息。

在处理器

做类似:

exchng.getOut()的setHeader( “类型”, “队列”);。 exchng.getOut()。setBody(exchng.getIn()。getBody());

然后可以在第二条路线上添加一个选项,不需要第三条路线。