2017-02-21 87 views
1

我想通过使用JAVA的Spark访问HBase。除了this之外,我还没有找到任何例子。在回答写入,通过使用Spark和JAVA从HBase读取数据

您也可以在Java

写这篇文章我复制从How to read from hbase using spark验证码:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} 
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 

import org.apache.spark._ 

object HBaseRead { 
    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") 
    val sc = new SparkContext(sparkConf) 
    val conf = HBaseConfiguration.create() 
    val tableName = "table1" 

    System.setProperty("user.name", "hdfs") 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    conf.set("hbase.master", "localhost:60000") 
    conf.setInt("timeout", 120000) 
    conf.set("hbase.zookeeper.quorum", "localhost") 
    conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
    conf.set(TableInputFormat.INPUT_TABLE, tableName) 

    val admin = new HBaseAdmin(conf) 
    if (!admin.isTableAvailable(tableName)) { 
     val tableDesc = new HTableDescriptor(tableName) 
     admin.createTable(tableDesc) 
    } 

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) 
    println("Number of Records found : " + hBaseRDD.count()) 
    sc.stop() 
    } 
} 

谁能给我一些提示如何找到正确的依赖关系,对象和东西?

看起来好像HBaseConfigurationhbase-client,但我实际上坚持在TableInputFormat.INPUT_TABLE。这不应该在相同的依赖?

有更好的方式来访问hbase的火花吗?

回答

0

TableInputFormat类在hbase-server.jar中,您需要在您的pom.xml中添加该依赖项。请在Spark用户列表中检查HBase and non-existent TableInputFormat

<dependency> 
    <groupId>org.apache.hbase</groupId> 
    <artifactId>hbase-server</artifactId> 
    <version>1.3.0</version> 
</dependency> 

下面是使用Spark从Hbase读取的示例代码。

public static void main(String[] args) throws Exception { 
    SparkConf sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[*]"); 
    JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
    Configuration hbaseConf = HBaseConfiguration.create(); 
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "my_table"); 
    JavaPairRDD<ImmutableBytesWritable, Result> javaPairRdd = jsc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class); 
    jsc.stop(); 
    } 
} 
+0

感谢您的示例代码,我要测试这 – monti

0

是的。有。使用Cloudera的SparkOnHbase

<dependency> 
    <groupId>org.apache.hbase</groupId> 
    <artifactId>hbase-spark</artifactId> 
    <version>1.2.0-cdh5.7.0</version> 
</dependency> 

而且使用HBase的扫描,从你读数据HBase的表(或批量get如果你知道你想要检索行的键)。

Configuration conf = HBaseConfiguration.create(); 
conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); 
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); 
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); 

Scan scan = new Scan(); 
scan.setCaching(100); 

JavaRDD<Tuple2<byte[], List<Tuple3<byte[], byte[], byte[]>>>> hbaseRdd = hbaseContext.hbaseRDD(tableName, scan); 

System.out.println("Number of Records found : " + hBaseRDD.count()) 
+0

抱歉,最近的答案,我只是试图添加'hbase-spark'。我意识到artefact-id不在maven中央。因此,我将'https:// repository.cloudera.com/artifactory/cloudera-repos /'添加为pom作为存储库。它仍然说'POM for org.apache.hbase:hbase-spark:jar:1.2.0-cdh5.7.0缺失,没有可用的依赖信息,任何建议? – monti