2017-07-28 31 views
1

我想在运行时使用BinaryObjects创建缓存。例如,我不需要编写pojo类(如Employee)并将其配置为缓存值类型,而需要能够使用特定缓存的字段名称和字段类型动态配置缓存。点击创建并查询二进制缓存

下面是一些示例代码:

public class EmployeeQuery { 

public static void main(String[] args) throws Exception { 
    Ignition.setClientMode(true); 
    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { 
     if (!ExamplesUtils.hasServerNodes(ignite)) 
      return; 
     CacheConfiguration<Integer, BinaryObject> cfg = getbinaryCache("emplCache", 1); 
     ignite.destroyCache(cfg.getName()); 
     try (IgniteCache<Integer, BinaryObject> emplCache = ignite.getOrCreateCache(cfg)) { 

      SqlFieldsQuery top5Qry = new SqlFieldsQuery("select * from Employee where salary > 500 limit 5", true); 
      while (true) { 
       QueryCursor<List<?>> top5qryResult = emplCache.query(top5Qry); 

        System.out.println(">>> Employees "); 
        List<List<?>> all = top5qryResult.getAll(); 
        for (List<?> list : all) { 
         System.out.println("Top 5 query result : "+list.get(0) + " , "+ list.get(1) + " , " + list.get(2)); 
        } 
        System.out.println("..... "); 
       Thread.sleep(5000); 
      } 
     } 
     finally { 
      ignite.destroyCache(cfg.getName()); 
     } 
    } 
} 

private static QueryEntity createEmployeeQueryEntity() { 
    QueryEntity employeeEntity = new QueryEntity(); 
    employeeEntity.setTableName("Employee"); 
    employeeEntity.setValueType(BinaryObject.class.getName()); 
    employeeEntity.setKeyType(Integer.class.getName()); 
    LinkedHashMap<String, String> fields = new LinkedHashMap<>(); 

    fields.put("id", Integer.class.getName()); 
    fields.put("firstName", String.class.getName()); 
    fields.put("lastName", String.class.getName()); 
    fields.put("salary", Float.class.getName()); 
    fields.put("gender", String.class.getName()); 

    employeeEntity.setFields(fields); 
    employeeEntity.setIndexes(Arrays.asList(
     new QueryIndex("id"), 
     new QueryIndex("firstName"), 
     new QueryIndex("lastName"), 
     new QueryIndex("salary"), 
     new QueryIndex("gender") 
    )); 

    return employeeEntity; 
} 

public static CacheConfiguration<Integer, BinaryObject> getbinaryCache(String cacheName, int duration) { 
    CacheConfiguration<Integer, BinaryObject> cfg = new CacheConfiguration<>(cacheName); 
    cfg.setCacheMode(CacheMode.PARTITIONED); 
    cfg.setName(cacheName); 
    cfg.setStoreKeepBinary(true); 
    cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); 
    cfg.setIndexedTypes(Integer.class, BinaryObject.class); 
    cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, duration)))); 
    cfg.setQueryEntities(Arrays.asList(createEmployeeQueryEntity())); 
    return cfg; 
} 

} 

我想配置与雇员(整数)作为关键和整个员工记录(BinaryObject)的值缓存。当我运行上述课程时,出现以下例外情况:

Caused by: org.h2.jdbc.JdbcSQLException: Table "EMPLOYEE" not found; SQL statement: 
select * from "emplCache".Employee where salary > 500 limit 5 

我在做什么错在这里?还有什么比这行更多:

employeeEntity.setTableName("Employee"); 

接下来,我想流数据到缓存中。这是做到这一点的正确方法吗?

public class CsvStreamer { 

public static void main(String[] args) throws IOException { 
    Ignition.setClientMode(true); 

    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { 
     if (!ExamplesUtils.hasServerNodes(ignite)) 
      return; 
     CacheConfiguration<Integer, BinaryObject> cfg = EmployeeQuery.getbinaryCache("emplCache", 1); 
     try (IgniteDataStreamer<Integer, BinaryObject> stmr = ignite.dataStreamer(cfg.getName())) { 
      while (true) { 
       InputStream in = new FileInputStream(new File(args[0])); 
       try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { 
        int count =0; 
        for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { 
         String[] words = line.split(","); 
         BinaryObject emp = getBinaryObject(words); 

         stmr.addData(new Integer(words[0]), emp); 
         System.out.println("Sent data "+count++ +" , sal : "+words[6]); 
        } 
       } 
      } 
     } 
    } 
} 

private static BinaryObject getBinaryObject(String[] rawData) { 
    BinaryObjectBuilder builder = Ignition.ignite().binary().builder("Employee"); 
    builder.setField("id", new Integer(rawData[0])); 
    builder.setField("firstName", rawData[1]); 
    builder.setField("lastName", rawData[2]); 
    builder.setField("salary", new Float(rawData[6])); 
    builder.setField("gender", rawData[4]); 
    BinaryObject binaryObj = builder.build(); 
    return binaryObj; 
} 

} 

注:我正在集群模式下运行此。 EmployeeQuery和CsvStreamer我都是从一台机器运行的,而且我已经在其他两台机器上以服务器模式启动。理想情况下,我想避免在我的应用程序中使用Pojo类,并尽可能使其具有动态性和通用性。

回答

2

您收到此异常,因为你没有配置SQL方案。在你的情况下(你不想创建pojo对象等),我推荐使用像syntacsis这样的SQL,它自2.0版本以来被添加到Apache Ignite中。我相信下面的例子可以帮助您进行配置:https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryDdlExample.java

+0

试图运行此相同的例子。最初,我得到了PUBLIC架构无法删除的错误。所以我删除了该条款。 然后我得到一个错误,说我需要设置缓存的索引。所以我把下面一行: cacheCfg.setIndexedTypes(Long.class,QueryEntity.class); //或者它是Object.class?无论哪种方式,我在org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1681) 得到一个空指针缺少什么我在这里? –

+0

看着源代码。看起来像“if(prepared.isQuery()){” (IgniteH2Indexing.java中的第1642行)总是返回false。这是预期的吗? 因此,twoStepQry是nul,我们得到一个NPE –

+1

我意识到2.1版本是在27日发布的。正在使用ignite-2.0.0。将使用新的代码库并尝试。 –