2
我可以使用下面的代码读卡夫卡消息:星火RDD写入HBase的
val ssc = new StreamingContext(sc, Seconds(50))
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap)
能否请你帮我,我怎么能写卡夫卡读取到HBase的每封邮件?这是我想写的,但没有成功。
lines.foreachRDD(rdd => {
rdd.foreach(record => {
val i = +1
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, "test")
val thePut = new Put(Bytes.toBytes(i))
thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record))
})
})
我的错误。我错过了,尽管我把它作为我的代码的一部分。这里的问题是:如何将每条消息从Kafka传递给Hbase?我能够读取来自Kafka的消息,但是在foreachRDD中无法将该消息移植到Hbase。打电话给foreachRDD我做错了吗? – 2014-12-04 11:53:26