2016-08-02 45 views
1

我想阅读Spark流代码中的HBase数据,以便查找和进一步增强流式数据。我正在使用spark-hbase-connector_2.10-1.0.3.jar阅读Scala中的HBase - it.nerdammer

在我下面的代码行是成功的

val docRdd = 
    sc.hbaseTable[(Option[String], Option[String])]("hbase_customer_profile") 
    .select("id","gender").inColumnFamily("data") 

docRdd.count返回正确的计数。

docRdd的类型是

HBaseReaderBuilder(org.apache.spark.SparkContext @ 3a49e5,hbase_customer_profile,一些(数据),WrappedArray(ID, 性别),无,无,列表())

如何读取id, gender列中的所有行。另外我怎样才能将docRdd转换成数据帧,以便SparkSQL可以使用。

回答

1

可以使用

docRdd.collect().foreach(println) 

要将RDD转换为DataFrame读取来自RDD所有行,你可以定义一个案例类:

case class Customer(rowKey: String, id: Option[String], gender: Option[String]) 

我已经添加了行键到案例分类;这不是绝对必要的,所以如果你不需要它,你可以省略它。

然后mapRDD

// Row key, id, gender 
type Record = (String, Option[String], Option[String]) 

val rdd = 
    sc.hbaseTable[Record]("customers") 
    .select("id","gender") 
    .inColumnFamily("data") 
    .map(r => Customer(r._1, r._2, r._3)) 

,然后 - 根据案件类型 - 转换RDDDataFrame

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.show() 
df.printSchema() 

spark-shell输出看起来是这样的:

scala> df.show() 
+---------+----+------+ 
| rowKey| id|gender| 
+---------+----+------+ 
|customer1| 1| null| 
|customer2|null|  f| 
|customer3| 3|  m| 
+---------+----+------+ 

scala> df.printSchema() 
root 
|-- rowKey: string (nullable = true) 
|-- id: string (nullable = true) 
|-- gender: string (nullable = true) 
+0

谢谢@Beryllium。我会试试这个。我想在SparkStream中使用RDD。我希望它也是可序列化的。再次感谢您的帮助 –

+0

您是否需要关于此问题的更多帮助? – Beryllium

+0

我都在这一套。谢谢.. –