2016-10-04 76 views
2

我正在关注sample of mapWithState function on Databricks website在Spark Streaming中使用mapWithState指定超时

为trackstatefunction这些码是如下:

def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

我的情况下的问题,当状态是时序出(state.isTimingout()==true)那么函数再次更新其可能会导致异常的SATE。样本是否属实?

回答

3

在状态为超时的情况下(state.isTimingout() == true),则函数再次更新可能导致异常的状态。

是的,这是正确的。如果您在mapWithState上设置了明确的超时时间,并且在状态处于最后一次超时迭代时调用state.update,则会导致抛出异常,因为一旦发生超时就无法更新状态。这是明确表示in the documentation

国家不能被更新,如果它已经被删除(即 remove()方法已经被调用),或者将被删除,由于 超时(即,isTimingOut()是真的)。


在你的榜样,一个额外的检查是为了:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    if (!state.isTimingOut) state.update(sum) 
    Some(output) 
} 

或者,由于value只应None一旦发生超时,您可以使用模式匹配,以及:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    value match { 
    case Some(v) => 
     val sum = v.toLong + state.getOption.getOrElse(0L) 
     state.update(sum) 
     Some((key, sum)) 
    case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L)) 
    } 
} 

有关有状态数据流的审查,请参阅this blog post(免责声明:我是作者)。

+0

hi @Yuval,所以如果一个特定的键超时,所有的状态都消失了?你需要从头开始? – marios

+1

@marios是的,超时后,键被标记为删除。 –

+0

我想如果你需要在超时后坚持你的状态,你需要自己动手吗?感谢Yuval! – marios