2017-04-01 43 views
2

我试图使用GeoMesa Native API插入并从accumulo存储中读取数据。我创建了一个类文件,以便本机使用geomesa accumulo存储。这是我的Java代码:使用GeoMesa Native API在Accumulo中插入数据

package org.locationtech.geomesa.api; 

import com.google.common.base.Function; 
import com.google.common.collect.ImmutableMap; 
import com.google.common.collect.Iterables; 
import com.google.common.collect.Lists; 
import com.google.gson.Gson; 
import com.vividsolutions.jts.geom.Coordinate; 
import com.vividsolutions.jts.geom.Geometry; 
import com.vividsolutions.jts.geom.GeometryFactory; 
import org.apache.accumulo.core.client.Connector; 
import org.apache.accumulo.core.client.mock.MockInstance; 
import org.apache.accumulo.core.client.security.tokens.PasswordToken; 
import org.apache.accumulo.core.security.Authorizations; 
import org.geotools.factory.CommonFactoryFinder; 
import org.geotools.feature.AttributeTypeBuilder; 
import org.geotools.geometry.jts.JTSFactoryFinder; 
import org.junit.Assert; 
import org.junit.Test; 
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$; 
import org.locationtech.geomesa.utils.index.IndexMode$; 
import org.opengis.feature.simple.SimpleFeature; 
import org.opengis.feature.type.AttributeDescriptor; 
import org.opengis.filter.FilterFactory2; 

import javax.annotation.Nullable; 
import java.time.ZonedDateTime; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.Map; 
import java.util.SortedSet; 
import java.util.TreeSet; 

public class WorkerBeta { 
    public static void main(String[] args){ 
     try { 
      DomainObjectValueSerializer dovs = new DomainObjectValueSerializer(); 
      final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
      "aj_v14", 
      "localhost:2181", 
      "hps", 
      "root", "9869547580", 
      false, 
      dovs, 
      new SimpleFeatureView<DomainObject>() { 
       AttributeTypeBuilder atb = new AttributeTypeBuilder(); 
       private List<AttributeDescriptor> attributeDescriptors = 
       Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId") 
        , atb.binding(String.class).buildDescriptor("dId") 
        , atb.binding(Integer.class).buildDescriptor("s") 
        , atb.binding(Integer.class).buildDescriptor("a") 
        , atb.binding(Integer.class).buildDescriptor("e") 
       ); 
       @Override 
       public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) { 
       f.setAttribute("rId", domainObject.rideId); 
       f.setAttribute("dId", domainObject.deviceId); 
       f.setAttribute("s", domainObject.speed); 
       f.setAttribute("a", domainObject.angle); 
       f.setAttribute("e", domainObject.error); 
       } 

       @Override 
       public List<AttributeDescriptor> getExtraAttributes() { 
       return attributeDescriptors; 
       } 
      } 
     ); 

     //Inserting 
     final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1); 
     final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory(); 
     System.out.println(index.insert(
       one, 
       gf.createPoint(new Coordinate(-74.0, 34.0)), 
       date("2017-03-31T01:15:00.000Z") 
      )); 

      //Read 
      GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder() 
       .within(-90.0, -180, 90, 180) 
       .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z")) 
       .build(); 
      Iterable<DomainObject> results = index.query(q); 
      int counter = 0; 
      for(DomainObject dm : results){ 
       counter += 1; 
       System.out.println("result counter: " + counter); 
       dovs.toBytes(dm); 
      } 
     } 
     catch (Exception ex){ 
     ex.printStackTrace(); 
     } 
    } 
    public static class DomainObject { 
     public final int rideId; 
     public final String deviceId; 
     public final int angle; 
     public final int speed; 
     public final int error; 

     public DomainObject(int rideId, String deviceId, int angle, int speed, int error) { 
      this.rideId = rideId; 
      this.deviceId = deviceId; 
      this.angle = angle; 
      this.speed = speed; 
      this.error = error; 
     } 
    } 
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> { 
     public static final Gson gson = new Gson(); 
     @Override 
     public byte[] toBytes(DomainObject o) { 
      return gson.toJson(o).getBytes(); 
     } 
     @Override 
     public DomainObject fromBytes(byte[] bytes) { 
      return gson.fromJson(new String(bytes), DomainObject.class); 
     } 
    } 
    public static Date date(String s) { 
     return Date.from(ZonedDateTime.parse(s).toInstant()); 
    } 
} 

日志中的命令:

[email protected]:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta 
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files. 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/ 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA> 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 w[email protected]73eb439a 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000 
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0 
50fa12fb-11f8-4776-bb35-95b32da9225d 
[] 

但是当我尝试验证插入的记录,我无法找到创建相关的任何具体条目插入数据accumulo网络界面表。这里是accumulo表enter image description here的屏幕截图。如果我缺少任何东西,请纠正我。提前感谢。

回答

1

可能您的插入没有被刷新到磁盘。 Accumulo使用批处理写入程序进行性能测试 - 一旦内部缓冲区填满,它就会定期写入磁盘。由于您只插入一条记录,因此没有发生。 要修复,你可以在你的GeoMesaIndex实例上调用close。这将刷新任何现有的记录到磁盘。然后你需要实例化一个新的实例来完成你的查询。

+0

太棒了!它正在工作。你能否给我这个信息的参考或你是如何得出这个答案的。 –

+0

我可以使用flush()而不关闭索引吗?我正在侦听插入请求的队列通道,并且不希望为每个请求创建索引对象 –

+0

我试过调用flush(),但它不起作用。根据[故事](https://geomesa.atlassian.net/browse/GEOMESA-885?focusedCommentId=16309&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16309),我们可以发送在不关闭作者的情况下积累突变。 –

1

两个快速笔记:

  1. 您的类型没有不场名为“DTG”和GeoMesaQuery假设之一。要轻松解决这个问题,可以使用'GeoMesaQuery.GeoMesaQueryBuilder.builder()。include()。build()'。长期来看,本机api可以使用一些改进措施,以便轻松做到您想要的流畅。

  2. 要查看记录是否写入Accumulo,可以使用Accumulo shell并扫描各个表。如果表中没有任何内容,则可能需要调试此code