2017-08-30 96 views
0

我有以下情况处理在弗林克CEP多个图案并联

enter image description here

有其发送流卡夫卡正在被由CEP引擎接收其中警告时产生的特定条件2的虚拟机对个人Stream满意。

目前,CEP是检查两个流上相同条件下(当心脏率> 65和呼吸率> 68)患者和如下图所示

// detecting pattern 
     Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start") 
       .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getHeartRate() > 65 ; 
        } 
       }) 
       .subtype(joinEvent.class) 
       .where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getRespirationRate() > 68; 
        } 
       }).within(Time.milliseconds(100)); 

但我想用提高并行报警Streams的不同条件。例如,我想报警如果

For patient A : if heart rate > 65 and Respiration Rate > 68 
For patient B : if heart rate > 75 and Respiration Rate > 78 

我该如何做到这一点?是否需要在同一环境中创建多个流环境或多个模式?

+0

嘿,我想知道你是否找到了你的问题的解决方案? –

+0

是的,不同的病人写了不同的主题,flink有许多并行工作的工作人员,每个人都在听一个话题并执行cep –

+0

感谢您的回复,我认为不同的病人写了同一个源/ DataStream,并且您想要应用不同的根据不同事件/患者TT的CEP模式 –

回答

1

根据您的要求,您可以创建2种不同的图案,以便在需要时清晰分离。

如果你想用相同的模式执行此操作,那么它也是可能的。要做到这一点,阅读一个卡夫卡源所有的卡夫卡主题:

FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
     Arrays.asList("topic1", "topic2"), 
     new StringSerializerToEvent(), 
     props); 

在这里,我假设从两个主题的活动的结构都是一样的,你有患者姓名以及部分事件被传送。

一旦你做到了,它变得容易,因为你只需要创建一个模式与“或”,类似如下:

Pattern.<JoinEvent>begin("first") 
     .where(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68; 
      } 
     }) 
     .or(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78; 
      } 
     }); 

这将产生一个匹配,只要你的条件相匹配。虽然,我不确定在你的例子中“.within(Time.milliseconds(100))”是什么。