2015-09-20 58 views
3

我有一个用例,我正在使用kafka streaming来听一个主题并计算所有单词及其出现次数。我想保存我从DSTREAM如何使用kafka streaming的RDD执行hbase上的bulkIncrement

这里创建RDD在HBase的每次词语的计数是我在使用阅读的主题,它工作得很好,给我一个String RDD代码,龙

val broker = "localhost:9092" 
val zk ="localhost:2181" 
val topic = "sparktest" 

val sparkConf = new SparkConf().setAppName("KafkaHBaseWordCount").setMaster("local[2]") 
sparkConf.set("spark.driver.allowMultipleContexts" , "true") 
val sc = new SparkContext(sparkConf) 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 

val kafkaConf = Map("metadata.broker.list" -> broker, "zookeeper.connect" -> zk,"group.id" -> "kafka-spark-streaming-example", "zookeeper.connection.timeout.ms" -> "1000") 

val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2) 

val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 

我现在想更新这个词在HBase的统计,让我们说,如果我的HBase的表已经有几个条目

ROW  COLUMN+CELL                                    
hi  column=word:count, timestamp=1442685920109, value=\x00\x00\x00\x00\x00\x00\x00\x04                  
hello  column=word:count, timestamp=1442685220641, value=\x00\x00\x00\x00\x00\x00\x00\x01                  
where  column=word:count, timestamp=1442685920261, value=\x00\x00\x00\x00\x00\x00\x00\x01 

,我已经收到了流新词,而RDD现在持有额外的

喜,2
你好,5

这将让导致HBase的, '喜' 新罪名 - > 6和 '你好' - > 5

我已经得到这个与合作以下代码,

wordCounts.foreachRDD (rdd => { 

     val conf = HBaseConfiguration.create() 
     conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count") 
     conf.set("hbase.zookeeper.quorum", "localhost:2181") 
     conf.set("hbase.master", "localhost:60000"); 
     conf.set("hbase.rootdir", "file:///tmp/hbase") 

     val hConf = HBaseConfiguration.create() 
     val hTable = new HTable(hConf, "stream_count") 
     rdd.collect().foreach(record => { 
      val increment = new Increment(Bytes.toBytes(record._1)) 
      increment.addColumn(Bytes.toBytes("word"), Bytes.toBytes("count"), record._2) 
      hTable.increment(increment) 
     }) 
    }) 

有没有更好的方法来做到这一点?我试着看着cloudera的sparkOnHbase,它有一个bulkIncrement,但我无法做到这一点。我对大数据/火花相当陌生,任何指针将不胜感激。

回答

1

我有一个类似的用例,并尝试类似的逻辑作为你的。但是这个表现不好。如果我不设置spark.default.parallelism,它使用默认的2个执行程序来执行它。即使我设置它,其速度也不如“saveAsNewHadoopAPIDataset”快,但是这个api(使用TableOutputFormat时)不支持Increment(仅支持Put和Delete)。

您对此的绩效指标如何,以及您是如何在此方面实现并行性的。

相关问题