2017-03-07 87 views
0

我正在尝试运行连续异步查询。在我的Windows中,我曾与ignite.bat双击文件,并试图运行下面的代码开始的Apache点燃 -Apache Ignite:连续异步查询不连续工作

数据发布者客户端 包点燃;

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.IgniteDataStreamer; 
import org.apache.ignite.Ignition; 
import org.apache.ignite.cache.CacheMode; 
import org.apache.ignite.configuration.CacheConfiguration; 

public class IgniteStreamPublisher { 
    public static void main(String[] args) throws Exception { 
     System.out.println("Run Spring example!!"); 
     Ignition.setClientMode(true); 

     Ignite ignite = Ignition.start(); 

     CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<Integer, Person>("myStreamCache"); 
     cacheConfiguration.setIndexedTypes(Integer.class, Person.class); 
     cacheConfiguration.setCacheMode(CacheMode.PARTITIONED); 

     IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheConfiguration); 

     IgniteDataStreamer<Integer, Person> stmr = ignite.dataStreamer("myStreamCache"); 
     stmr.allowOverwrite(true); 
     try { 
      for (int i = 0; i < 100; i++) { 
       Person person = new Person(i, i, "name_" + i, (i * 100) % 3000); 
       System.out.println("putting--" + person); 
       stmr.addData(i, person);  
       Thread.sleep(1*1000); 
       stmr.flush(); 
      } 
     }finally{ 
      stmr.close(); 
     } 
    } 

} 

数据接收客户端

package ignite; 

import javax.cache.Cache; 
import javax.cache.configuration.Factory; 
import javax.cache.event.CacheEntryEvent; 
import javax.cache.event.CacheEntryEventFilter; 
import javax.cache.event.CacheEntryUpdatedListener; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.Ignition; 
import org.apache.ignite.cache.CacheMode; 
import org.apache.ignite.cache.query.ContinuousQuery; 
import org.apache.ignite.cache.query.Query; 
import org.apache.ignite.cache.query.QueryCursor; 
import org.apache.ignite.cache.query.ScanQuery; 
import org.apache.ignite.configuration.CacheConfiguration; 
import org.apache.ignite.lang.IgniteBiPredicate; 

public class IgniteAsyncStreamReceiver { 
    public static void main(String[] args) throws Exception { 
     System.out.println("Run Spring example!!"); 
     Ignition.setClientMode(true); 
     Ignite ignite = Ignition.start(); 

     CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<Integer, Person>("myStreamCache"); 
     cacheConfiguration.setIndexedTypes(Integer.class, Person.class); 
     cacheConfiguration.setCacheMode(CacheMode.PARTITIONED); 


     IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheConfiguration); 

     System.out.println(); 
     System.out.println(">>> Cache continuous query example started."); 



     // Create new continuous query. 
     ContinuousQuery<Integer, Person> qry = new ContinuousQuery<>(); 
     IgniteBiPredicate<Integer, Person> filter = new MyIgniteBiPredicate(); 
     Query<Cache.Entry<Integer, Person>> scanQuery = new ScanQuery<>(filter); 
     qry.setInitialQuery(scanQuery); 

     // Callback that is called locally when update notifications are received. 
     qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Person>() { 

      @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Person>> evts) { 
       for (CacheEntryEvent<? extends Integer, ? extends Person> e : evts) 
        System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); 
      } 
     }); 

     // This filter will be evaluated remotely on all nodes. 
     // Entry that pass this filter will be sent to the caller. 
     Factory<CacheEntryEventFilter<Integer, Person>> rmtFilterFactory = new MyRemoteFilterFactory(); 
     qry.setRemoteFilterFactory(rmtFilterFactory); 

     // Execute query. 
     try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) { 
      // Iterate through existing data. 
      for (Cache.Entry<Integer, Person> e : cur) 
       System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); 
      Thread.sleep(2000); 
     } 

    } 


} 

* RemoteFileFilterFactory实施*

package ignite; 

import javax.cache.configuration.Factory; 
import javax.cache.event.CacheEntryEvent; 
import javax.cache.event.CacheEntryEventFilter; 
import javax.cache.event.CacheEntryListenerException; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.lang.IgniteAsyncCallback; 
import org.apache.ignite.resources.IgniteInstanceResource; 

public class MyRemoteFilterFactory implements Factory<CacheEntryEventFilter<Integer, Person>> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public CacheEntryEventFilter<Integer, Person> create() { 
     return new CacheEntryFilter(); 
    } 

    @IgniteAsyncCallback 
    private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, Person> { 
     /** Ignite instance. */ 
     @IgniteInstanceResource 
     private Ignite ignite; 

     /** {@inheritDoc} */ 
     @Override 
     public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Person> cache) throws CacheEntryListenerException {     
      System.out.println("Event : "+ (cache.getValue())); 
      return true; 
     } 
    } 
} 

IgniteBiPredicate实施

package ignite; 

import org.apache.ignite.lang.IgniteBiPredicate; 

public class MyIgniteBiPredicate implements IgniteBiPredicate<Integer, Person> { 

    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 

    @Override 
    public boolean apply(Integer key, Person person) { 
     return person.getSal() < 1000; 
    } 

} 

人是一个POJO -

package ignite; 

public class Person { 
    int id; 
    int age; 
    String name; 
    int sal; 
    public Person(int id, int age, String name, int sal) { 
     super(); 
     this.id = id; 
     this.age = age; 
     this.name = name; 
     this.sal = sal; 
    } 
    public int getId() { 
     return id; 
    } 
    public void setId(int id) { 
     this.id = id; 
    } 
    public int getAge() { 
     return age; 
    } 
    public void setAge(int age) { 
     this.age = age; 
    } 
    public String getName() { 
     return name; 
    } 
    public void setName(String name) { 
     this.name = name; 
    } 
    public int getSal() { 
     return sal; 
    } 
    public void setSal(int sal) { 
     this.sal = sal; 
    } 
    @Override 
    public String toString() { 
     StringBuilder builder = new StringBuilder(); 
     builder.append("Person [id="); 
     builder.append(id); 
     builder.append(", age="); 
     builder.append(age); 
     builder.append(", name="); 
     builder.append(name); 
     builder.append(", sal="); 
     builder.append(sal); 
     builder.append("]"); 
     return builder.toString(); 
    } 



} 

我没有得到Ignite服务器控制台或出版商/接收器客户端上的任何错误。但是我的接收器在初始高速缓存捕捉之后只收到1或2条记录。我指的是https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java -

Run Spring example!! 
[16:45:24] (wrn) Default Spring XML file not found (is IGNITE_HOME set?): config/default-config.xml 
Mar 07, 2017 4:45:24 PM java.util.logging.LogManager$RootLogger log 
SEVERE: Failed to resolve default logging config file: config/java.util.logging.properties 
[16:45:24] __________ ________________ 
[16:45:24] /_/ ___/ |//_/_ __/ __/ 
[16:45:24] _/ // (7 7 ////// _/ 
[16:45:24] /___/\___/_/|_/___/ /_/ /___/ 
[16:45:24] 
[16:45:24] ver. 1.9.0#20170302-sha1:a8169d0a 
[16:45:24] 2017 Copyright(C) Apache Software Foundation 
[16:45:24] 
[16:45:24] Ignite documentation: http://ignite.apache.org 
[16:45:24] 
[16:45:24] Quiet mode. 
[16:45:24] ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat} 
[16:45:24] 
[16:45:24] OS: Windows 7 6.1 x86 
[16:45:24] VM information: Java(TM) SE Runtime Environment 1.8.0_60-b27 Oracle Corporation Java HotSpot(TM) Client VM 25.60-b23 
[16:45:24] Initial heap size is 16MB (should be no less than 512MB, use -Xms512m -Xmx512m). 
[16:45:24] Configured plugins: 
[16:45:24] ^-- None 
[16:45:24] 
[16:45:25] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides. 
[16:45:25] Security status [authentication=off, tls/ssl=off] 
[16:45:26] REST protocols do not start on client node. To start the protocols on client node set '-DIGNITE_REST_START_ON_CLIENT=true' system property. 
[16:45:27] Performance suggestions for grid (fix if possible) 
[16:45:27] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true 
[16:45:27] ^-- Enable server mode for JVM (add '-server' to JVM options) 
[16:45:27] ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options) 
[16:45:27] ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options) 
[16:45:27] ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options) 
[16:45:27] ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options) 
[16:45:27] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning 
[16:45:27] 
[16:45:27] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat} 
[16:45:27] 
[16:45:27] Ignite node started OK (id=98218fc2) 
[16:45:27] Topology snapshot [ver=9, servers=1, clients=2, CPUs=4, heap=1.5GB] 

>>> Cache continuous query example started. 
Queried existing entry [key=0, val=Person [id=0, age=0, name=name_0, sal=0]] 
Queried existing entry [key=1, val=Person [id=1, age=1, name=name_1, sal=100]] 
Queried existing entry [key=2, val=Person [id=2, age=2, name=name_2, sal=200]] 
Queried existing entry [key=3, val=Person [id=3, age=3, name=name_3, sal=300]] 
Queried existing entry [key=4, val=Person [id=4, age=4, name=name_4, sal=400]] 
Queried existing entry [key=5, val=Person [id=5, age=5, name=name_5, sal=500]] 
Queried existing entry [key=6, val=Person [id=6, age=6, name=name_6, sal=600]] 
Queried existing entry [key=7, val=Person [id=7, age=7, name=name_7, sal=700]] 
Queried existing entry [key=8, val=Person [id=8, age=8, name=name_8, sal=800]] 
Queried existing entry [key=9, val=Person [id=9, age=9, name=name_9, sal=900]] 
Queried existing entry [key=30, val=Person [id=30, age=30, name=name_30, sal=0]] 
Queried existing entry [key=31, val=Person [id=31, age=31, name=name_31, sal=100]] 
Queried existing entry [key=32, val=Person [id=32, age=32, name=name_32, sal=200]] 
Queried existing entry [key=33, val=Person [id=33, age=33, name=name_33, sal=300]] 
Queried existing entry [key=34, val=Person [id=34, age=34, name=name_34, sal=400]] 
Queried existing entry [key=35, val=Person [id=35, age=35, name=name_35, sal=500]] 
Queried existing entry [key=36, val=Person [id=36, age=36, name=name_36, sal=600]] 
Queried existing entry [key=37, val=Person [id=37, age=37, name=name_37, sal=700]] 
Updated entry [key=10, val=Person [id=10, age=10, name=name_10, sal=1000]] 
Updated entry [key=11, val=Person [id=11, age=11, name=name_11, sal=1100]] 
<<NO RECORD after key 11. SOme time it publish 3-4 *Updated Entry* and some time only 1-2>> 

回答

1

QueryCursor被关闭,连续查询被取消。如果你摆脱了尝试与资源块,它会像你期望的那样工作。即这个代码

try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) { 
    // Iterate through existing data. 
    for (Cache.Entry<Integer, Person> e : cur) 
     System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); 
    Thread.sleep(2000); 
} 

应与眼前这个被替换:

QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry); 

for (Cache.Entry<Integer, Person> e : cur) 
    System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); 
+0

感谢。它在完成这个改变之后起作用。 – Sushil