2017-06-03 43 views
0

Im试图检索JavaPairRDD中每个元素的缓存值。我使用LOCAL缓存模式,因为我想尽量减少缓存数据的数据混洗。点火节点在火花作业内以嵌入模式启动。下面的代码工作正常,如果我在单个节点上运行它。但是,当我在5台机器上运行它时,我得到zero结果。查询基于Spark RDD元素的Ignite Cache

第一次尝试我有用的是IgniteRDD SQL方法:

dataRDD.sql("SELECT v.id,v.sub,v.obj FROM VPRow v JOIN table(id bigint = ?) i ON v.id = i.id",new Object[] {objKeyEntries.toArray()}); 

其中objKeyEntries处于RDD收集的条目的集合。第二次尝试使用AffinityRun:

JavaPairRDD<Long, VPRow> objEntries = objKeyEntries.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Long, Boolean>>, Long, VPRow>() { 
    @Override 
    public Iterator<Tuple2<Long, VPRow>> call(Iterator<Tuple2<Long, Boolean>> tuple2Iterator) throws Exception { 
     ApplicationContext ctx = new ClassPathXmlApplicationContext("ignite-rdd.xml"); 
     IgniteConfiguration igniteConfiguration = (IgniteConfiguration) ctx.getBean("ignite.cfg"); 
     Ignite ignite = Ignition.getOrStart(igniteConfiguration); 
     IgniteCache<Long, VPRow> cache = ignite.getOrCreateCache("dataRDD"); 

     ArrayList<Tuple2<Long,VPRow>> lst = new ArrayList<>(); 
     while(tuple2Iterator.hasNext()) { 
      Tuple2<Long, Boolean> val = tuple2Iterator.next(); 
      ignite.compute().affinityRun("dataRDD", val._1(),()->{ 
       lst.add(new Tuple2<>(val._1(),cache.get(val._1()))); 
      }); 
     } 
     return lst.iterator(); 
    } 
}); 

以下是在Ignite-rdd.xml配置文件:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> 
     <property name="memoryConfiguration"> 
      <bean class="org.apache.ignite.configuration.MemoryConfiguration"> 
       <property name="systemCacheInitialSize" value="#{100 * 1024 * 1024}"/> 
       <property name="defaultMemoryPolicyName" value="default_mem_plc"/> 
       <property name="memoryPolicies"> 
        <list> 
         <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration"> 
          <property name="name" value="default_mem_plc"/> 
          <property name="initialSize" value="#{5 * 1024 * 1024 * 1024}"/> 
         </bean> 
        </list> 
       </property> 
      </bean> 
     </property> 
     <property name="cacheConfiguration"> 
      <list> 
       <bean class="org.apache.ignite.configuration.CacheConfiguration"> 
        <!-- Set a cache name. --> 
        <property name="name" value="dataRDD"/> 
        <!-- Set a cache mode. --> 
        <property name="cacheMode" value="LOCAL"/> 
        <!-- Index Integer pairs used in the example. --> 
        <property name="indexedTypes"> 
         <list> 
          <value>java.lang.Long</value> 
          <value>edu.code.VPRow</value> 
         </list> 
        </property> 
        <property name="affinity"> 
         <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"> 
          <property name="partitions" value="50"/> 
         </bean> 
        </property> 
       </bean> 
      </list> 
     </property> 
     <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> 
     <property name="discoverySpi"> 
      <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> 
       <property name="ipFinder"> 
        <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> 
         <property name="addresses"> 
          <list> 
           <value>[IP5]</value> 
           <value>[IP4]</value> 
           <value>[IP3]</value> 
           <value>[IP2]</value> 
           <value>[IP1]</value> 
          </list> 
         </property> 
        </bean> 
       </property> 
      </bean> 
     </property> 
    </bean> 
</beans> 
+1

检查您的缓存中是否有任何数据:dataRDD.query(new ScanQuery()) –

+0

@Evgenii:在集群上运行代码时,缓存似乎为空。但是,使用相同的代码,缓存包含单个节点设置中的元素。 – alexandria

+1

正如它在这里所说:https://apacheignite-fs.readme.io/docs/ignitecontext-igniterdd#section-igniterdd IgniteRDD利用了Ignite缓存的分区特性并向Spark执行器提供了分区信息。所以,你应该使用分区缓存模式 –

回答

0

你确定你需要使用本地缓存模式?

您很可能只在一个节点上填充缓存,而其他节点上的本地缓存仍然为空。

affinityRun不起作用,因为您有LOCAL缓存,而不是PARTITIONED,因此,使用AffinityFunction无法确定密钥的所有者节点。

+0

Iv编辑了这个问题来传达围绕相同问题的另一个试验(即以本地缓存模式获取值)。 – alexandria