2016-04-14 87 views
0

Spark,Hbase on Hadoop + Yarn,我想从Scala应用程序中读取和写入HBase,并使用SBT构建。如何配置Spark Streaming Scala应用程序在Hadoop + Yarn上读取HBase

我不能创建一个HBase的

斯卡拉APP:

/usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala 



package com.mydomain.spark.hbasewordcount 

import org.apache.spark._ 
import org.apache.spark.streaming._ 

import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.mapred.JobConf 

object HBaseScalaWordCount { 
    def main(args: Array[String]) { 

     val name = "Example of read from HBase table" 

     lazy val sparkConf = new SparkConf().setAppName(name) 
     lazy val ssc = new StreamingContext(sparkConf, Seconds(1)) 
     implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath 

     val columns = Map(
      "cf1" -> Set("col1", "col2"), 
      "cf2" -> Set("col3") 
     ) 

     ssc.hbase[String]("testtable", columns) 
     .map({ case (k, v) => 
      val cf1 = v("cf1") 
      val col1 = cf1("col1") 
      val col2 = cf1("col2") 
      val col3 = v("cf2")("col3") 

      List(k, col1, col2, col3) mkString "\t" 
     }) 
     .saveAsTextFile("file:/home/hduser/hbasetest-output") 
    } 
} 

SBT文件:

/usr/local/sparkapps/HBaseWordCount/HBaseWordCount.sbt 


name := "HBaseScalaWordCount" 

version := "1.0" 

scalaVersion := "2.10.6" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.1" % "provided", 
    "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided", 
    "org.apache.hbase" % "hbase-common" % "1.2.1" % "provided", 
    "org.apache.hbase" % "hbase-client" % "1.2.1" % "provided", 
    "org.apache.hbase" % "hbase-server" % "1.2.1" % "provided", 
    "eu.unicredit" %% "hbase-rdd" % "0.7.1" 
) 

SBT包装

/usr/local/sparkapps/HBaseWordCount$ sbt package 


[info] Set current project to HBaseScalaWordCount (in build   file:/usr/local/sparkapps/HBaseWordCount/) 
[info] Compiling 1 Scala source to /usr/local/sparkapps/HBaseWordCount/target/scala-2.10/classes... 
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:29: not found: value HBaseConfig 
[error]   implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath 
[error]        ^
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:36: value hbase is not a member of org.apache.spark.streaming.StreamingContext 
[error]   ssc.hbase[String]("testtable", columns) 
[error]   ^
[error] two errors found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 9 s, completed Apr 14, 2016 4:11:40 PM  

HBase的是在Hadoop的,但我正常工作不能了解如何为Spark配置类路径,例如在/usr/local/spark/conf/spark-deafaults.conf中实际不存在,我只有spark-deafaults.conf.template

SPARK-ENV .SH:

/usr/local/spark/conf/spark-env.sh 

export SPARK_MASTER_IP=localhost 
export SPARK_WORKER_CORES=1 
export SPARK_WORKER_MEMORY=800m 
export SPARK_WORKER_INSTANCES=1 

火花DEFAULTS.CONF:

​​

HBASE PATH:

/usr/local/hbase/hbase-1.1.3/lib/ 

HBASE_SITE.XML:

/usr/local/hbase/hbase-1.1.3/conf/hbase-site.xml 


<?xml version="1.0"?> 
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 


<configuration> 

    <property> 
    <name>hbase.rootdir</name> 
    <value>hdfs://localhost:9000/hbase</value> 
    </property> 

    <property> 
    <name>hbase.cluster.distributed</name> 
    <value>true</value> 
    </property> 

    <property> 
    <name>hbase.zookeeper.quorum</name> 
    <value>localhost</value> 
    </property> 

    <property> 
    <name>dfs.replication</name> 
    <value>1</value> 
    </property> 

    <property> 
    <name>hbase.zookeeper.property.clientPort</name> 
    <value>2181</value> 
    </property> 

    <property> 
    <name>hbase.zookeeper.property.dataDir</name> 
    <value>/home/hduser/hbase/zookeeper</value> 
    </property> 

</configuration> 

回答

1

首先,SBT找不到类HBaseConf。这是因为您已导入org.apache.hadoop.hbase.HBaseConfiguration,但您需要的课程是unicredit.spark.hbase.HBaseConf

你的第二个问题是

value hbase is not a member of org.apache.spark.streaming.StreamingContextvalue hbase is not a member of org.apache.spark.streaming.StreamingContext 

这意味着,SBT无法找到在StreamingContexthbase方法。 我看到您正在使用hbase-rdd向Spark添加HBase支持。 如果你检查项目的README,你必须添加进口线为它的implicits,所以将它添加到您的类的顶部:

import unicredit.spark.hbase._ 

Implicits是一个很好的补充斯卡拉,它可以延长其他包类的功能。使用导入的implicits,hbase方法应该在您的SparkContext实例上可用。

请注意,您还没有SparkContext实例,但只有一个StreamingContext,所以先创建一个。也不需要制作它们lazy

+0

非常感谢,目标是使用HBase作为火花流的输入,并在计算后写入另一个hbase表。 unicredit.spark.hbase需要吗? – Mike

+0

你应该没问题,或者你也可以看看这篇文章:http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase -spark模块/ –

相关问题