我正在尝试骆驼卡夫卡集成。
我有两个队列:
queue1
和queue2
。骆驼卡夫卡集成问题
有三种途径:
- ROUTE1放入
queue1
两个消息的列表(它应该这样做只能一次)。 - Route2到读取
queue1
列表,将其分解,并提出个人信息在queue2
- 路径3读取
queue2
消息,只是打印了。
的代码如下:
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelListTest {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new CamelListRoute());
context.start();
Thread.sleep(30000);
context.stop();
}
}
class CamelListRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
//Route1, expected to run once
from("timer://timerName?repeatCount=1").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> inOrderList = new ArrayList<String>();
inOrderList.add("1");
inOrderList.add("2");
exchange.getIn().setBody(inOrderList, ArrayList.class);
}
})
.to("kafka:<ip>:9092?topic=queue1");
//Route2
from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.split()
.body().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("2nd Route : " + (exchange.getIn().getBody().toString()));
}
})
.to("kafka:<ip>:9092?topic=queue2");
//Route3
from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("3rd Route : " + (exchange.getIn().getBody().toString()));
}
});
}
}
它不按预期工作,并有观察到几个问题:
- 第一条路线,预计将只运行一次(repeatCount = 1),连续运行,一次又一次地将相同的消息放入
queue1
。 - 第二条路线读取
queue1
消息,将其分解,但不把它放在queue2
- 由于第二路由没有放任何东西
queue2
,这条路线没有得到任何消息。
谁能帮我弄清楚这里有什么问题吗?
是的,我正确地使用了网址。使用exchange.getOut()。setBody()将数据传递到下一个路由后,问题得到解决。 – rvd
Ohk,我想你可以发表答案。这对其他人会有帮助。谢谢。 –