2017-09-15 51 views
0

检查点时打开一个简单的CEP环形图案CEP问题,而检查点。“找不到ID为条目”

private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn) 
     .followedBy("middle").where(checkStatusOn).times(2) 
     .next("end").where(checkStatusOn).within(Time.minutes(5)) 

我看到故障。

SimpleBinaryEvent是

public class SimpleBinaryEvent implements Serializable { 

private int id; 
private int sequence; 
private boolean status; 
private long time; 

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) { 
    this.id = id; 
    this.sequence = sequence; 
    this.status = status; 
    this.time = time; 
} 
public int getId() { 
    return id; 
} 
public int getSequence() { 
    return sequence; 
} 
public boolean isStatus() { 
    return status; 
} 
public long getTime() { 
    return time; 
} 
@Override 
public boolean equals(Object o) { 
    if (this == o) return true; 
    if (o == null || getClass() != o.getClass()) return false; 

    SimpleBinaryEvent that = (SimpleBinaryEvent) o; 

    if (getId() != that.getId()) return false; 
    if (isStatus() != that.isStatus()) return false; 
    if (getSequence() != that.getSequence()) return false; 
    return getTime() == that.getTime(); 
} 

@Override 
public int hashCode() { 
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime()); 
    int result = getId(); 
    result = 31 * result + (isStatus() ? 1 : 0); 
    result = 31 * result + getSequence(); 
    result = 31 * result + (int) (getTime()^(getTime() >>> 32)); 
    return result; 
} 

@Override 
public String toString() { 
    return "SimpleBinaryEvent{" + 
      "id='" + id + '\'' + 
      ", status=" + status + 
      ", sequence=" + sequence + 
      ", time=" + time + 
      '}'; 
} 

}

故障原因:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1). 
... 6 more 
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),.... 

我相信我有equals()和hashCode()方法实现的事情应该是这样。我也尝试了Objects.hashCode。在其他情况下,我在SharedBuffer.toString()上使用了CircularReference(因此stackOverflow),它再次指向引用问题(相等和不相等)。没有打开检查点,它可以按预期工作。我在本地群集上运行。 CEP生产准备好了吗?

我正在使用1.3.2 Flink

+0

它看起来非常相似,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Keyed-CEP-checkpoint-fails- td14795.html – VishalSan

+0

如果需要,我可以从一个自包含的junit中重现,这是我如何创建它的开始。 – VishalSan

+0

如果您可以发布带独立Junit的JIRA,那将会很棒。那么我会看看它。 –

回答

0

非常感谢您尝试图书馆并报告此问题!

随着越来越多的功能被添加到库中,图书馆正在积极开发中。 1.3是这个丰富的语义库的第一个版本,所以我们期望看到1)人们如何使用它,2)如果有任何错误。所以我想说这不是100%的生产准备,但它并不遥远。

现在为了解决这个问题,我想你正在使用RocksDB作点检查,对不对?我假设的原因是,在RocksDB中,在每个水印(在事件时间内),您将需要的状态反序列化(例如NFA),处理一些事件,然后在将其重新序列化回RocksDB之前重新序列化。

对于文件系统状态后端,情况并非如此,您只能在检查点时序列化状态,并且仅在恢复时才读取并反序列化它。所以在这种情况下,考虑到你说没有检查点你的工作正常工作,你只会在从故障中恢复后才会看到这个问题。

问题的根源可能是equals()/hashcode()是错误的(似乎不是这种情况),或者我们序列化/反序列化CEP状态的方式存在问题。

您还可以提供一个最小的输入序列的事件,产生这种情况发生?这对于重现问题将非常有帮助。

非常感谢, 科斯塔斯

+0

我在LocalFlinkMiniCluster(config,false)中以junit身份运行。将准备好测试并提交。被其他东西猛击了。 – VishalSan