Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin("start")
.next("3").where(new FilterFunction< Tuple3< String, String, String > >() {
@Override
public boolean filter (Tuple3< String, String, String > value) throws Exception {
return value.f2.equals("3");
}
})
.next("4").subtype(Tuple.getTupleClass(2)).where(new FilterFunction< Tuple2< String, String> >() {
@Override
public boolean filter (Tuple2< String, String > value) throws Exception {
return value.f1.equals("3");
}
})
亚型(Tuple.getTupleClass(2)),以及occoured错误 Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'
如何做subtype()元组在flink cep?
我应该修改呢?但如何? Pattern< Tuple3< String, String, String >, ? > pattern
由2017012
JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where<String>.EqualTo
joinedStreams = someStream
.join(otherStream)
.where(value -> value.f1)
.equalTo(value -> value.f1);
Pattern< Tuple, ? > pattern = Pattern.<Tuple> begin("start")
.subtype(Tuple3.class)
.where(evt -> evt.f2.equals("3"))
.next("4")
.subtype(Tuple2.class)
.where(evt -> evt.f1.equals("3"))
.within(Time.seconds(10));
PatternStream< ...> patternStream = CEP.pattern(joinedStreams, pattern);
更新我试过这种方法,但不是我应填写PatternStream< ...>
.Thanks的人谁可以提供帮助。
非常感谢你,那么我该如何表达这一点:Tuple2后跟Tuple3? –
我更新我的答案,希望这会有所帮助。 –
非常感谢。另外一个问题是我可以在CEP.pattern(partitionedEventA,pattern)中加入flink CEP中的另一个流;''''''''''''''''''''''''但我不知道该怎么做。再次感谢。 –