2017-01-10 79 views
0
Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin("start")    
      .next("3").where(new FilterFunction< Tuple3< String, String, String > >() { 
       @Override 
       public boolean filter (Tuple3< String, String, String > value) throws Exception { 
        return value.f2.equals("3"); 
       } 
      }) 
      .next("4").subtype(Tuple.getTupleClass(2)).where(new FilterFunction< Tuple2< String, String> >() { 
       @Override 
       public boolean filter (Tuple2< String, String > value) throws Exception { 
        return value.f1.equals("3"); 
       } 
      }) 

亚型(Tuple.getTupleClass(2)),以及occoured错误 Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'如何做subtype()元组在flink cep?

我应该修改呢?但如何? Pattern< Tuple3< String, String, String >, ? > pattern


由2017012

JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where<String>.EqualTo 
     joinedStreams = someStream 
     .join(otherStream) 
     .where(value -> value.f1) 
     .equalTo(value -> value.f1); 

Pattern< Tuple, ? > pattern = Pattern.<Tuple> begin("start") 
     .subtype(Tuple3.class) 
     .where(evt -> evt.f2.equals("3")) 
     .next("4") 
     .subtype(Tuple2.class) 
     .where(evt -> evt.f1.equals("3")) 
     .within(Time.seconds(10)); 

PatternStream< ...> patternStream = CEP.pattern(joinedStreams, pattern); 

更新我试过这种方法,但不是我应填写PatternStream< ...> .Thanks的人谁可以提供帮助。

回答

0
这个

什么:

Pattern<Tuple, ?> pattern = Pattern.<Tuple>begin("start") 
      .subtype(Tuple3.class) 
      .where(evt -> evt.f2.equals("3")) 
      .next("4") 
      .subtype(Tuple2.class) 
      .where(evt -> evt.f1.equals("3")) 
      .within(Time.seconds(10)); 
  1. 你不用加下一期开始后
  2. 通知亚型的字面意思,tuple3和tuple2应该扩展的元组。

如果你想连接两个不同的数据流。

DataStream<Tuple2> someStream = //... 
DataStream<Tuple3> otherStream = //... 

ConnectedStreams<Tuple2, Tuple3> connectedStreams = someStream.connect(otherStream); 

然后你可以使用COMAP,CoFlatMap得到同样的类型,例如变换Tuple2,Tuple3为String: ConnectedStreams→的数据流中

connectedStreams.flatMap(new CoFlatMapFunction<Tuple2, Tuple3, String>() { 

    @Override 
    public void flatMap1(Integer value, Collector<String> out) { 
     out.collect(Tuple2.toString()); 
    } 

    @Override 
    public void flatMap2(String value, Collector<String> out) { 
     for (String word: value.split(" ")) { 
     out.collect(Tuple3.toString); 
     } 
    } 
}); 

下面是一些有用的链接,引入良好的使用案例:

  1. Introducing Complex Event Processing (CEP) with Apache Flink
  2. Chinese version that I translate
+0

非常感谢你,那么我该如何表达这一点:Tuple2后跟Tuple3? –

+0

我更新我的答案,希望这会有所帮助。 –

+0

非常感谢。另外一个问题是我可以在CEP.pattern(partitionedEventA,pattern)中加入flink CEP中的另一个流;''''''''''''''''''''''''但我不知道该怎么做。再次感谢。 –

0

试试这个代码:

Pattern<Tuple, ?> pattern = 
    Pattern.<Tuple>begin("start") 
    .next("3") 
     .subtype(Tuple3.class) 
     .where(new FilterFunction<Tuple3>() { 

      @Override 
      public boolean filter(Tuple3 value) throws Exception { 
       return value.f2.equals("3"); 
      } 
     }) 
    .next("4") 
     .subtype(Tuple2.class) 
     .where(new FilterFunction<Tuple2>() { 

      @Override 
      public boolean filter(Tuple2 value) throws Exception { 
       return value.f1.equals("3"); 
      } 
     }); 

开始有普通型Tuple和使用的具体类型Tuple2Tuple3为子事件。而这种模式的数据流必须有Tuple类型。

+0

非常感谢,我会试试看,后面会给你答复。另一个问题是我可以加入flink CEP中的另一个流吗?cuz'CEP.pattern(partitionedEventA,pattern);'你只能传递一个DataStream到它在我questino Tuple2是从另一个DataStream.But我不知道如何做到这一点,再次感谢。 –