检查点时打开一个简单的CEP环形图案CEP问题,而检查点。“找不到ID为条目”
private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
.followedBy("middle").where(checkStatusOn).times(2)
.next("end").where(checkStatusOn).within(Time.minutes(5))
我看到故障。
SimpleBinaryEvent是
public class SimpleBinaryEvent implements Serializable {
private int id;
private int sequence;
private boolean status;
private long time;
public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
this.id = id;
this.sequence = sequence;
this.status = status;
this.time = time;
}
public int getId() {
return id;
}
public int getSequence() {
return sequence;
}
public boolean isStatus() {
return status;
}
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimpleBinaryEvent that = (SimpleBinaryEvent) o;
if (getId() != that.getId()) return false;
if (isStatus() != that.isStatus()) return false;
if (getSequence() != that.getSequence()) return false;
return getTime() == that.getTime();
}
@Override
public int hashCode() {
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());
int result = getId();
result = 31 * result + (isStatus() ? 1 : 0);
result = 31 * result + getSequence();
result = 31 * result + (int) (getTime()^(getTime() >>> 32));
return result;
}
@Override
public String toString() {
return "SimpleBinaryEvent{" +
"id='" + id + '\'' +
", status=" + status +
", sequence=" + sequence +
", time=" + time +
'}';
}
}
故障原因:
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
我相信我有equals()和hashCode()方法实现的事情应该是这样。我也尝试了Objects.hashCode。在其他情况下,我在SharedBuffer.toString()上使用了CircularReference(因此stackOverflow),它再次指向引用问题(相等和不相等)。没有打开检查点,它可以按预期工作。我在本地群集上运行。 CEP生产准备好了吗?
我正在使用1.3.2 Flink
它看起来非常相似,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Keyed-CEP-checkpoint-fails- td14795.html – VishalSan
如果需要,我可以从一个自包含的junit中重现,这是我如何创建它的开始。 – VishalSan
如果您可以发布带独立Junit的JIRA,那将会很棒。那么我会看看它。 –