1
我的问题是,如果我们有两个原始事件流,即烟雾和温度,我们要找出是否复杂的事件即消防已经运用运营商的原始数据流发生的事情,我们可以做到这一点在弗林克?是否有可能在apache flink CEP中处理多个流?
我在问这个问题,因为我目前为Flink CEP看过的所有例子都只包含一个输入流。如果我错了,请纠正我。
我的问题是,如果我们有两个原始事件流,即烟雾和温度,我们要找出是否复杂的事件即消防已经运用运营商的原始数据流发生的事情,我们可以做到这一点在弗林克?是否有可能在apache flink CEP中处理多个流?
我在问这个问题,因为我目前为Flink CEP看过的所有例子都只包含一个输入流。如果我错了,请纠正我。
简答 - 是的,您可以根据来自不同流源的事件类型读取和处理多个流并激发规则。
长答案 - 我有一个类似的要求,我的回答是基于您正在阅读不同的kafka话题的不同流的假设。
阅读从单一源流不同的事件不同的主题:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
串行器读取数据,并将其解析到一个有一个共同的格式 - 对于如。
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
,并在这之后,事情很简单,定义基于事件名称的规则和定义的规则比较的事件名称(您也可以定义复杂的规则如下):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
我希望这可以让您将一个或多个不同的流集成在一起。