0

我想在春季云数据流量配置DLQ。这里是流认定中,我如何部署它春季云数据流DLQ配置不工作

stream create --definition ":someTestTopic > custom-transform 
    --spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran 


    stream deploy ticktran --properties 
    "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true" 

在定制变换 - 处理器的代码,我已经提到

if(out.contains("ERROR")) { 
      throw new RuntimeException("Error "); 
     } 

这意味着,如果消息包含错误,那么RuntimeException的,我要捕捉那些DLQ中的消息。但是,当我运行代码时,似乎没有得到名称为test-tran的任何Kafka DL队列。

我是否需要设置更多的属性,以使DLQ或者我需要改变代码的东西正确使用DLQ的。

自定义转换代码

TransformationServiceApplication.java

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.hateoas.config.EnableEntityLinks; 

@SpringBootApplication 
@EnableEntityLinks 
public class TransformationServiceApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(TransformationServiceApplication.class, args); 
    } 
} 

TransformationMessageEndPoint.java

@EnableBinding(Processor.class) 
@MessageEndpoint 
public class TransformationMessageEndpoint { 

    private static final String NS = "http://openrisk.com/ingestion/"; 

    AtomicInteger index = new AtomicInteger(1); 
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public Object process(Message<?> message) { 
     String out = new String((byte[])message.getPayload()); 

     System.out.println("*****" + out); 

     if(out.contains("ERROR")) { 
      throw new RuntimeException("Error "); 
     } 

     return message; 

    } 
} 

的pom.xml

<parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.3.6.RELEASE</version> 
     <relativePath /> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-starter-dataflow-server-local</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-starter-stream-kafka</artifactId> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka --> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-stream-binder-kafka</artifactId> 
      <version>1.0.0.RELEASE</version> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-stream-test-support</artifactId> 
      <version>1.0.0.BUILD-SNAPSHOT</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud.stream.module</groupId> 
      <artifactId>spring-cloud-stream-modules-test-support</artifactId> 
      <version>1.0.0.BUILD-SNAPSHOT</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.jena</groupId> 
      <artifactId>jena-core</artifactId> 
      <version>3.1.0</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-core</artifactId> 
      <version>2.8.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

添加国防部ULE

app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT 

添加流

stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran 

部署流

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq" 
+0

您正在使用什么版本的新加坡民防部队的工作? 'module register'命令超旧;至少6-7个月大。我们早已远离这些条款。请尝试最新的[1.1 M1版本](http://docs.spring.io/spring-cloud-dataflow/docs/1.1.0.M1/reference/htmlsingle/)。 –

+0

随着新版本的工作。谢谢。 –

+0

很高兴听到这个消息。请分享您的最终发现和/或评论,也许还考虑将问题标记为已解决。 –

回答

0

有你流定义的几个问题。

  • 部署属性以app.<app-name>.开头,但您在少数地方已有apps.<app-name>.
  • 目的地是在SCDF中自动创建的,因此不推荐使用覆盖默认值。但是,您可以在运行独立应用程序spring-cloud-stream时执行此操作。
  • 而不是使用自定义的目的地,您可以通过直接默认通道相互作用使DLQ - 参见下面的例子。

流创建--definition “:someTesssstTopic>变换|日志” --name ticktran

流部署ticktran --properties“app.log.spring.cloud.stream.bindings.input。 consumer.headerMode = raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode = raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq“

  • 目标test-tran不在accep表格格式在涉及app.transform.spring.cloud.stream.kafka.bindings.<channelName>.consumer.enableDlq属性中的引用。
  • 最后,当有一个错误,才会创建了error.<destination>.<group>话题。

我们将通过添加几个DSL样本到参考指南:#885

编辑: 我更新了流定义以反映正确的部署属性前缀。

+0

谢谢破旧。我尝试过上面的例子,但仍未创建DLQ。在转换代码中,我抛出了RunTimeException,并且由于异常,我得到了处理3次的消息,但是在完成所有处理后,我期待在DLQ中转储的消息和异常不会发生。我仍然缺少一些属性? –

+0

我们可以通过编程的方式定义错误通道吗,我看过一些在处理器代码中定义错误通道的例子。什么是正确的方法来做到这一点,为什么需要以编程方式定义错误通道。 –

+0

对不起,我在流定义中有错字 - 我现在纠正了。你可以请重试吗?此外,如果您可以共享您的自定义处理器代码(_gh repo?_),我们可以尝试复制此行为。 –

0

我改变了数据流的版本1.1 M1 release与下面提及的命令来创建和部署特性,现在

stream create --definition ":someTesstTopic > transform | log " --name ticktran 


stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq" 

感谢Sabby阿南丹