2017-08-11 204 views
0

我无法使用自述文件中提到的示例向Hbase写入数据。下面是一个简单的代码,说明我的方法,我遇到的错误无法从Spark Scala Dataframe将数据写入Hbase

import org.apache.spark.sql.execution.datasources.hbase._ 

val input = Seq(
| ("a:1", "null", "null", "3", "4", "5", "6"), 
| ("b:1", "2", "3", "null", "null", "5", "6") 
) 

val df = input.toDF 

val TIMELINE_TABLE = "test_timeline" 

val timelineCatalog = 
    s""" 
    "table":{"namespace":"default", "name":""".stripMargin+ TIMELINE_TABLE +"""", "tableCoder":"PrimitiveType"}, 
             |"rowkey":"key", 
             |"columns":{ 
             |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, 
             |"col1":{"cf":"main", "col":"kx", "type":"string"}, 
             |"col2":{"cf":"main", "col":"ky", "type":"string"}, 
             |"col3":{"cf":"main", "col":"rx", "type":"string"}, 
             |"col4":{"cf":"main", "col":"ry", "type":string"}, 
             |"col5":{"cf":"main", "col":"wx", "type":"string"}, 
             |"col6":{"cf":"main", "col":"wy", "type":"string"} 
             |} 
             |}""".stripMargin 

val HBASE_CONF_FILE = "/etc/hbase/conf/hbase-site.xml" 
df.write.options(Map(HBaseTableCatalog.tableCatalog -> timelineCatalog)).format("org.apache.spark.sql.execution.datasources.hbase").save() 

java.lang.ClassCastException: org.json4s.JsonAST$JString cannot be cast to org.json4s.JsonAST$JObject 
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257) 
    at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:77) 
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    ... 50 elided 

我的Scala版本是2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131),和我Spark 2.1.0.2.6.0.10-29和HBase的1.1.2是我使用hortonworks-spark/shc连接器,我无法找到任何毛病我的数据。它实际上是一个干净的版本。在理想情况下,我希望字符串"null"是实际的null。但是我无法做到这一点,所以我认为创建字符串至少可以让它工作。任何帮助将不胜感激。我也提出了一个问题,在Github以及https://github.com/hortonworks-spark/shc/issues/172

回答

0

虽然我还没有真正找到了使用hortonworks/shc这里的方式是一种替代方法做同样的

import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{SparkSession} 

val sqlContext = sparkSession.sqlContext 
import sqlContext.implicits._ 

val input = Seq(
    ("a:1", "null", "null"), 
    ("b:1", "2", "3") 
) 

val df = input.toDF("col0", "col1", "col2") 

val TIMELINE_TABLE = "prod_timeline" 


val config = HBaseConfiguration.create() 
config.clear() 

config.set("hbase.zookeeper.quorum", "zk0.example.net"); 
config.set("zookeeper.znode.parent", "/hbase") 
config.set("hbase.zookeeper.property.clientPort", "2181") 
config.set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE) 

val rdd = df.rdd.map(x => { 
    val rowkey = x.get(0).toString 
    var p = new Put(rowkey.getBytes()) 
    p.addColumn("main".toCharArray.map(_.toByte), "a".toCharArray.map(_.toByte), x.get(1).toString.getBytes()) 
    p.addColumn("main".toCharArray.map(_.toByte), "b".toCharArray.map(_.toByte), x.get(2).toString.getBytes()) 

    (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), p) 
}) 

val newAPIJobConfiguration = Job.getInstance(config) 
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE) 

newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]]) 

rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration());