2017-09-15 56 views
2

假设我有一个具有多个节点的点火群集和一个名为“TEST_CACHE”的分区非空IgniteCache。然后我运行中的一个节点下面的代码:StreamTransformer并发安全吗?

ignite.compute().run(new IgniteRunnable(){ 
    @IgniteInstanceResource 
    private Ignite ignite; 

    @Override 
    public void run() { 
     IgniteDataStreamer<String,Long> ds = ignite.dataStreamer("TEST_CACHE"); 
     ds.receiver(new StreamTransformer<String,Long>(){ 
      @Override 
      public Object process(MutableEntry<String, Long> entry, Object... arguments) 
        throws EntryProcessorException { 
       Long value = entry.getValue(); 
       entry.setValue(value==null?1L:(value.longValue()+1L)); 
       return null; 
      } 
     }); 

     //loop for adding lots of String data 
     while(...) 
      ds.addData(...); 
    } 

}); 

这类似于官方StreamTransformerExample代码,但是有什么不同的是每个节点都将获得相同的缓存的DataStreamer实例,并发调用addData方法。换句话说,对于不同节点中的相同字符串数据,可能有一个节点刚刚通过“Long value = entry.getValue()”得到值,但不执行下一行代码来设置值并更新到缓存中,则另一个节点是执行“entry.getValue()”。那么是否有可能在这个并发的StreamTransformer用例中更新错误的值?

回答

0

StreamReceiver.receive调用cache.invoke与您的入口处理器,所以入口被锁定在此操作中。所以是的,它是并行安全的。

顺便说一句,你在你的DataStreamer中启用了allowOverwrite吗?

+0

谢谢你的回答。我没有启用allowOverwrite。 allowOverwrite是否影响并发安全? – CrazyRen

+1

它并不是,但是:“默认情况下,数据流不会覆盖现有数据,这意味着如果它将遇到已经在缓存中的条目,它将跳过它。 – Konstantin

+0

查看https://apacheignite.readme.io/docs/data-streamers#section-allow-overwrite – Konstantin