2017-09-01 112 views
0

我写了一个扩展为FilterBase并转换为JAR的Hbase自定义过滤器。该过滤器看起来是这样的:使用Hbase自定义过滤器时出现异常

public class MyFilter1 extends FilterBase implements Serializable{ 
boolean filterRow= true; 
String srh; 

public MyFilter1(String str) { 
    this.srh= str; 
} 

@Override 
public ReturnCode filterKeyValue(Cell c) throws IOException { 
    String str= Bytes.toString(c.getValue()); 

    if(str.contains(str)) { 
     filterRow= false; 
     return ReturnCode.INCLUDE; 
    } 

    filterRow= true; 
    return ReturnCode.SKIP; 
} 

@Override 
public boolean filterRow() { 
    return filterRow; 
} 

@Override 
public byte[] toByteArray() throws IOException { 
    ByteArrayOutputStream out = new ByteArrayOutputStream(); 
    ObjectOutputStream os = new ObjectOutputStream(out); 
    os.writeObject(this); 
    return out.toByteArray(); 
} 

public static MyFilter1 parseFrom(final byte[] data) { 
    ByteArrayInputStream in = new ByteArrayInputStream(data); 
    MyFilter1 ans= null; 
    ObjectInputStream is; 
    try { 
     is = new ObjectInputStream(in); 
     ans= (MyFilter1)is.readObject();; 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return ans; 
} 

}

使得JAR文件(即MyFilter.jar)后,我把它放在/使用/本地/ HBase的/ lib目录/过滤器目录。然后,我在hbase-env.sh过滤器设置

出口HBASE_CLASSPATH = “在/ usr /本地/ HBase的/ lib目录/过滤器/ MyFilter.jar”

并重新启动HBase的服务器。然后我用从Java程序中的自定义过滤器:

public static void main(String argv[]) throws IOException { 
     Configuration conf= HBaseConfiguration.create(); 
     Connection con= ConnectionFactory.createConnection(conf); 

     Table table= con.getTable(TableName.valueOf("stud")); 

     Filter fl= new MyFilter("uc"); 

     Scan sc= new Scan(); 
     sc.setFilter(fl); 

     ResultScanner rs= table.getScanner(sc); 

     for(Result r : rs) 
      System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("perData"), Bytes.toBytes("name")))); 
    } 

但是,得到以下异常

Exception in thread "main" org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.reflect.InvocationTargetException 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1478) 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:993) 
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2396) 
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33648) 
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2180) 
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) 
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133) 
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1474) 
... 8 more 
Caused by: org.apache.hadoop.hbase.exceptions.DeserializationException: parseFrom called on base Filter, but should be called on derived type 
at org.apache.hadoop.hbase.filter.Filter.parseFrom(Filter.java:270) 
... 13 more 

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) 
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95) 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:329) 
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:408) 
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204) 
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:65) 
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210) 
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:364) 
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:338) 
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136) 
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.DoNotRetryIOException): org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.reflect.InvocationTargetException 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1478) 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:993) 
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2396) 
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33648) 
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2180) 
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) 
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133) 
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1474) 
... 8 more 
Caused by: org.apache.hadoop.hbase.exceptions.DeserializationException: parseFrom called on base Filter, but should be called on derived type 
at org.apache.hadoop.hbase.filter.Filter.parseFrom(Filter.java:270) 
... 13 more 

at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1267) 
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227) 
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336) 
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094) 
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:400) 
... 10 more 

任何一个可以请帮我这个...

回答

0

正如你可以在异常见这是由于“parseFrom在基础过滤器上调用,但应在派生类型上调用”引起的。

这意味着您必须在自定义过滤器类中实施parseFrom

此外,您可能需要实施toByteArray以及它们一起使用。

+0

谢谢您的回答。我是hbase的新手。有没有关于如何实现这些方法的教程或指南? – Subhankar

+1

什么样的例子比派生类FilterBase的HBase库过滤器更好。例如。 https://github.com/apache/hbase/blob/master/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java –

+0

我已经实现了parseFrom和ByteArray(参见编辑帖子)....但基金会得到同样的错误 – Subhankar

1

只是为了从上面的Abhishek Kumar的答案中增加一个(也许是重要的)细节。看来串行器toByteArray()和解串器parseFrom(byte[] rawBytes)必须通过谷歌协议缓冲区2来实现。下面是一个示例实现。

AFilter.java

// whatever fields you need for the AFilter 
long fieldA; 
long fieldB; 

/** 
* Transform this {@code AFilter} instance to a byte array for serialization. 
* @return raw bytes of this instance 
*/ 
@Override 
public byte[] toByteArray() { 
    final FilterProtos.AFilter.Builder builder = FilterProtos.AFilter.newBuilder(); 
    builder.setFieldA(fieldA); 
    builder.setFieldB(fieldB); 

    return builder.build().toByteArray(); 
} 

/** 
* De-serialize {@code AFilter} from {@code rawBytes}. 
* 
* @param rawBytes raw bytes of the filter 
* @return AFilter object 
* @throws DeserializationException 
*/ 
public static AFilter parseFrom(final byte[] rawBytes) 
     throws DeserializationException { 

    try { 
     FilterProtos.AFilter proto; 
     proto = FilterProtos.AFilter.parseFrom(rawBytes); 

     return new AFilter(proto.getFieldA(), proto.getFieldB()); 
    } catch (InvalidProtocolBufferException ex) { 
     throw new DeserializationException(
       ex); 
    } 
} 

Filters.proto

option java_package = "my.java.package"; 
option java_outer_classname = "FilterProtos"; 
option java_generic_services = true; 
option java_generate_equals_and_hash = true; 
option optimize_for = SPEED; 

message AFilter{ 
    required uint64 fieldA = 1; 
    required uint64 fieldB = 2; 
}