我有一个用例,我正在使用kafka streaming来听一个主题并计算所有单词及其出现次数。我想保存我从DSTREAM如何使用kafka streaming的RDD执行hbase上的bulkIncrement
这里创建RDD在HBase的每次词语的计数是我在使用阅读的主题,它工作得很好,给我一个String RDD代码,龙
val broker = "localhost:9092"
val zk ="localhost:2181"
val topic = "sparktest"
val sparkConf = new SparkConf().setAppName("KafkaHBaseWordCount").setMaster("local[2]")
sparkConf.set("spark.driver.allowMultipleContexts" , "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map("metadata.broker.list" -> broker, "zookeeper.connect" -> zk,"group.id" -> "kafka-spark-streaming-example", "zookeeper.connection.timeout.ms" -> "1000")
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
我现在想更新这个词在HBase的统计,让我们说,如果我的HBase的表已经有几个条目
ROW COLUMN+CELL
hi column=word:count, timestamp=1442685920109, value=\x00\x00\x00\x00\x00\x00\x00\x04
hello column=word:count, timestamp=1442685220641, value=\x00\x00\x00\x00\x00\x00\x00\x01
where column=word:count, timestamp=1442685920261, value=\x00\x00\x00\x00\x00\x00\x00\x01
,我已经收到了流新词,而RDD现在持有额外的
喜,2
你好,5
这将让导致HBase的, '喜' 新罪名 - > 6和 '你好' - > 5
我已经得到这个与合作以下代码,
wordCounts.foreachRDD (rdd => {
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
conf.set("hbase.zookeeper.quorum", "localhost:2181")
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.rootdir", "file:///tmp/hbase")
val hConf = HBaseConfiguration.create()
val hTable = new HTable(hConf, "stream_count")
rdd.collect().foreach(record => {
val increment = new Increment(Bytes.toBytes(record._1))
increment.addColumn(Bytes.toBytes("word"), Bytes.toBytes("count"), record._2)
hTable.increment(increment)
})
})
有没有更好的方法来做到这一点?我试着看着cloudera的sparkOnHbase,它有一个bulkIncrement,但我无法做到这一点。我对大数据/火花相当陌生,任何指针将不胜感激。