2016-05-17 82 views
0

我正在构建将订单路由到订单管理系统的示例实现。路由入站是一个交换机,具有两个标识符,一个用于客户标识,另一个用于目录项目标识。然后我将入站交换机构转换为我的Order域对象。然后,我的目标是调用内容丰富模式,一次来汇总来自客户的数据,另一个来汇总来自目录项的数据。如何获取骆驼内容Enricher从REST调用处理Exchange?

我的路线是:

@Override 
    public void configure() { 
     // Start by building an instance of RestConfigurationDefinition. Need to 
     // specify the component we are going to use for enabling REST endpoints, 
     // specifically CamelServlet in this case. Set the binding mode to JSON. 
     restConfiguration(). 
      // Leverage the CamelServlet component for the REST DSL 
      component("servlet"). 
      // Bind using JSON 
      bindingMode(RestBindingMode.json). 
      // I like pretty things... 
      dataFormatProperty("prettyPrint", "true"). 
      // This is the context path to be used for Swagger API documentation 
      apiContextPath("api-doc"). 
      // Properties for Swagger 
      // Title of the API 
     apiProperty("api.title", "Order Management API"). 
      // Version of the API 
      apiProperty("api.version", "1.0.0"). 
      // CORS (resource sharing) enablement 
      apiProperty("cors", "true"). 
      // Use localhost for calls 
      apiProperty("host", "localhost:8083"). 
      // Set base path 
      apiProperty("base.path", "nvisia-order-router-camel-service/api"); 

     // Definition of the post order endpoint 
     rest("/orderRouter"). 
      // This is a POST method call for routing an order using the 
      // order form 
     post(). 
      // Description of what the method does 
      description("Routes a new order to the order management service"). 
      // Define the type used for input 
      type(OrderForm.class). 
      // Define the type used for output, in this case the order 
      outType(String.class). 
      // Next, define where the message is routed to, first transformation 
      to("bean:orderRouterService?method=transformOrderFormToOrder(${body})") 
      .to("direct:enrichOrder"); 

     // Definition of the enrich order endpoint 
     from("direct:enrichOrder"). 
      // Use the Content Enricher EIP to aggregate customer info in the 
      // order. 
     enrich(
      "http4://localhost:8081/nvisia-customer-camel-service/api/customer/${body.customerId}", 
      new AggregationStrategy() { 
       @Override 
       public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
        Order originalBody = (Order) oldExchange.getIn().getBody(); 
        Customer resourceResponse = (Customer) newExchange.getIn().getBody(); 
        originalBody.setCustomer(resourceResponse); 
        if (oldExchange.getPattern().isOutCapable()) { 
        oldExchange.getOut().setBody(originalBody); 
        } else { 
        oldExchange.getIn().setBody(originalBody); 
        } 
        return oldExchange; 
       } 
      }). 
      // Use the Content Enricher EIP to aggregate catalog info in the 
      // order. 
     enrich(
      "http4://localhost:8080/nvisia-catalog-camel-service/api/customer/${body.catalogItemId}", 
      new AggregationStrategy() { 
       @Override 
       public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
        Order originalBody = (Order) oldExchange.getIn().getBody(); 
        CatalogItem resourceResponse = (CatalogItem) newExchange.getIn() 
         .getBody(); 
        originalBody.setCatalogItem(resourceResponse); 
        if (oldExchange.getPattern().isOutCapable()) { 
        oldExchange.getOut().setBody(originalBody); 
        } else { 
        oldExchange.getIn().setBody(originalBody); 
        } 
        return oldExchange; 
       } 
      }).to("direct:sendOrder"); 

     // Definition of the send order endpoint 
     from("direct:sendOrder"). 
      // Need to define the content type on the header 
      setHeader(org.apache.camel.Exchange.CONTENT_TYPE, 
        constant("application/json")) 
      . 
      // Be safe and define this as a post 
      setHeader(Exchange.HTTP_METHOD, 
        constant(org.apache.camel.component.http4.HttpMethods.POST)) 
      . 
      // Finally, send the order to be managed and get back the order ID 
      to("http4://localhost:8082/nvisia-order-management-camel-service/api/order"); 
    } 

我得到的例外是:

org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: OrderForm [customerId=1, catalogItemId=1] of type: com.nvisia.examples.camel.orderrouter.OrderForm on: Message[]. Caused by: No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1]. Exchange[ID-nvisia-mhoffman-50981-1463522552963-0-8]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1]] 
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:107) 
    at org.apache.camel.component.http4.HttpProducer.createRequestEntity(HttpProducer.java:523) 
    at org.apache.camel.component.http4.HttpProducer.createMethod(HttpProducer.java:422) 
    at org.apache.camel.component.http4.HttpProducer.process(HttpProducer.java:110) 
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) 
    at org.apache.camel.processor.Enricher.process(Enricher.java:187) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:468) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) 
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:62) 
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:468) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87) 
    at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:143) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration$ApplicationContextHeaderFilter.doFilterInternal(EndpointWebMvcAutoConfiguration.java:261) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:115) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:103) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.springframework.boot.context.web.ErrorPageFilter.doFilter(ErrorPageFilter.java:120) 
    at org.springframework.boot.context.web.ErrorPageFilter.access$000(ErrorPageFilter.java:61) 
    at org.springframework.boot.context.web.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:95) 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
    at org.springframework.boot.context.web.ErrorPageFilter.doFilter(ErrorPageFilter.java:113) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:217) 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142) 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) 
    at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:518) 
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1091) 
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:673) 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1] 
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:198) 
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:105) 
    ... 79 more 

注意我使用的是最新的骆驼,2.17.1。我的问题是,如果我正在获取身体为JSON格式的交换,那么如何使这两个内容丰富的调用来填充我的订单bean?由于我不需要将输入流发送到任何一个内容丰富的调用,所以我认为在每次交换之前都有一个处理器,但是我认为第二次聚合对第一次聚合中的信息进行维护会很困难。如果这不是推荐使用丰富内容的方式,请告诉我。

回答

0

我能够通过摆脱使用内容丰富而仅仅使用带有两个直接端点的组播来解决这个问题。我还需要明确解组和编组,这也造成了一些额外的问题。这是我使用的最终路线,以防其他人有用。

// Definition of the post order endpoint 
    rest("/orderRouter") 
     // This is a POST method call for routing an order using the order 
     // form 
     .post() 
     // Description of what the method does 
     .description("Routes a new order to the order management service") 
     // Define the type used for input 
     .type(OrderForm.class) 
     // Define the type used for output, in this case the order 
     .outType(OrderNumber.class) 
     // Now, process the order 
     .to("direct:processOrder"); 

    // This is the route that processes the order sent. First, we need to take 
    // the data from the order form passed, put it in the header and empty out 
    // the body of our incoming exchange. 
    from("direct:processOrder") 
     // Set header for customer ID 
     .setHeader("customerId", simple("${body.customerId}")) 
     // Set header for catalog item ID 
     .setHeader("catalogItemId", simple("${body.catalogItemId}")) 
     // Empty the body 
     .setBody(constant("")) 
     // Now, aggregate the data to an order type 
     .end() 
     // Use multicasting to call the customer and catalog item services 
     // in parallel. Then, use a strategy that groups the exchanges 
     // returned from the service calls into a single list for 
     // processing. 
     .multicast(new GroupedExchangeAggregationStrategy()) 
     // Use parallel processing 
     .parallelProcessing() 
     // Send to two direct components to get the data 
     .to("direct:getCustomerData", "direct:getCatalogItemData") 
     // End the multicast call 
     .end() 
     // Now process the exchange 
     .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       List<Exchange> exchanges = exchange.getIn().getBody(List.class); 
       Order order = new Order(); 
       for (Exchange exchangeToProcess : exchanges) { 
       if (exchangeToProcess.getIn().getBody() instanceof Customer) { 
        order.setCustomer(
          exchangeToProcess.getIn().getBody(Customer.class)); 
       } else if (exchangeToProcess.getIn() 
         .getBody() instanceof CatalogItem) { 
        order.setCatalogItem(
          exchangeToProcess.getIn().getBody(CatalogItem.class)); 
       } else { 
        // Ignore it for now. 
       } 
       } 
       order.setOrderDate(new Date(System.currentTimeMillis())); 
       exchange.getIn().setBody(order); 
      } 
     }) 
     // End this processor definition 
     .end() 
     // Need to marshal the body to JSON 
     .marshal() 
     // Need to use JSON for marshalling 
     .json(JsonLibrary.Jackson) 
     // Then convert it to a string 
     .convertBodyTo(String.class) 
     // We can now send the order to order management. Need to define the 
     // content type on the header 
     .setHeader(org.apache.camel.Exchange.CONTENT_TYPE, 
       constant("application/json")) 
     // Be safe and define this as a post 
     .setHeader(Exchange.HTTP_METHOD, 
       constant(org.apache.camel.component.http4.HttpMethods.POST)) 
     // Set the HTTP uri to be used. 
     .setHeader("CamelHttpUri", simple(
       "http://localhost:8082/nvisia-order-management-camel-service/api/order")) 
     // Finally, send the order to be managed and get back the order ID 
     .to("http4://localhost:8082/nvisia-order-management-camel-service/api/order") 
     // Next, convert the input stream returned to a string 
     .convertBodyTo(String.class) 
     // Finally, unmarshal the string to an object 
     .unmarshal().json(JsonLibrary.Jackson, OrderNumber.class); 

    // Retrieves the customer data from the REST service for customer. 
    from("direct:getCustomerData") 
     // Set the http method as GET 
     .setHeader("CamelHttpMethod", constant("GET")) 
     // Set the HTTP uri to be used. 
     .setHeader("CamelHttpUri", simple(
       "http://localhost:8081/nvisia-customer-camel-service/api/customer/${header.customerId}")) 
     // Define the endpoint; though, url will be ignored in favor of 
     // header 
     .to("http4://localhost:8081/nvisia-customer-camel-service/api/customer/${header.customerId}") 
     // Next, convert the input stream returned to a string 
     .convertBodyTo(String.class) 
     // Finally, unmarshal the string to an object 
     .unmarshal().json(JsonLibrary.Jackson, Customer.class); 

    // Retrieves the catalog item data from the REST service for catalog 
    // items. 
    from("direct:getCatalogItemData") 
     // Set the http method as GET 
     .setHeader("CamelHttpMethod", constant("GET")) 
     // Set the HTTP uri to be used. 
     .setHeader("CamelHttpUri", simple(
       "http://localhost:8080/nvisia-catalog-camel-service/api/catalogItem/${header.catalogItemId}")) 
     // Define the endpoint; though, url will be ignored in favor of 
     // header 
     .to("http4://localhost:8080/nvisia-catalog-camel-service/api/catalogItem/${header.catalogItemId}") 
     // Next, convert the input stream returned to a string 
     .convertBodyTo(String.class) 
     // Finally, unmarshal the string to an object 
     .unmarshal().json(JsonLibrary.Jackson, CatalogItem.class);