2
假设我有一个具有多个节点的点火群集和一个名为“TEST_CACHE”的分区非空IgniteCache。然后我运行中的一个节点下面的代码:StreamTransformer并发安全吗?
ignite.compute().run(new IgniteRunnable(){
@IgniteInstanceResource
private Ignite ignite;
@Override
public void run() {
IgniteDataStreamer<String,Long> ds = ignite.dataStreamer("TEST_CACHE");
ds.receiver(new StreamTransformer<String,Long>(){
@Override
public Object process(MutableEntry<String, Long> entry, Object... arguments)
throws EntryProcessorException {
Long value = entry.getValue();
entry.setValue(value==null?1L:(value.longValue()+1L));
return null;
}
});
//loop for adding lots of String data
while(...)
ds.addData(...);
}
});
这类似于官方StreamTransformerExample代码,但是有什么不同的是每个节点都将获得相同的缓存的DataStreamer实例,并发调用addData方法。换句话说,对于不同节点中的相同字符串数据,可能有一个节点刚刚通过“Long value = entry.getValue()”得到值,但不执行下一行代码来设置值并更新到缓存中,则另一个节点是执行“entry.getValue()”。那么是否有可能在这个并发的StreamTransformer用例中更新错误的值?
谢谢你的回答。我没有启用allowOverwrite。 allowOverwrite是否影响并发安全? – CrazyRen
它并不是,但是:“默认情况下,数据流不会覆盖现有数据,这意味着如果它将遇到已经在缓存中的条目,它将跳过它。 – Konstantin
查看https://apacheignite.readme.io/docs/data-streamers#section-allow-overwrite – Konstantin