2016-12-05 78 views
0

我知道,我们可以使用saveAsNewAPIHadoopDatasetRDD [(ImmutableBytesWritable,放)使用火花写入到HBase的表。怎样写RDD [列表[(ImmutableBytesWritable,放)到HBase的使用saveAsNewAPIHadoopDataset

但我有一个列表,即我想写2个不同的HBase表的RDD [List [(ImmutableBytesWritable,Put)]。 如何做到这一点?

以下是代码。

package com.scryAnalytics.FeatureExtractionController 

import com.scryAnalytics.FeatureExtractionController.DAO.{DocumentEntitiesDAO, NLPEntitiesDAO, SegmentFeaturesDAO} 
import com.scryAnalytics.NLPGeneric.{GateGenericNLP, NLPEntities} 
import com.sun.xml.bind.v2.TODO 
import com.vocp.ner.main.GateNERImpl 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HTableDescriptor, TableName} 
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.mapreduce.{MultiTableOutputFormat, TableInputFormat, TableOutputFormat} 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.mapreduce.Job 
import com.scryAnalytics.FeatureExtraction.SegmentsFeatureExtraction 
import com.scryAnalytics.FeatureExtraction.DAO.VOCPEntities 

import scala.collection.JavaConversions._ 
import gate.FeatureMap 
import java.util.Map.Entry 

import scala.collection.JavaConversions 
import scala.util.control.Breaks.break 
import scala.util.control.ControlThrowable 

/** 
    * Created by sahil on 1/12/16. 
*/ 

object Main { 
    def main(args: Array[String]): Unit = { 
    val inputTableName = "posts" 
    val outputTableName = "drugSegmentNew1" 
    val pluginHome = "/home/sahil/Voice-of-Cancer-Patients/VOCP Modules/bin/plugins" 
val sc = new SparkContext(new SparkConf().setAppName("HBaseRead").setMaster("local[4]")) 
val conf = HBaseConfiguration.create() 
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost") 
conf.set(TableInputFormat.INPUT_TABLE, inputTableName) 

val admin = new HBaseAdmin(conf) 
if (!admin.isTableAvailable(inputTableName)) { 
    val tableDesc = new HTableDescriptor(TableName.valueOf(inputTableName)) 
    admin.createTable(tableDesc) 
} 
val job: Job = Job.getInstance(conf, "FeatureExtractionJob") 
job.setOutputFormatClass(classOf[MultiTableOutputFormat]) 
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
    classOf[ImmutableBytesWritable], classOf[Result]) 
val resultRDD = hBaseRDD.map(x => x._2) 

// TODO: Add filters 

val entity: VOCPEntities = VOCPEntities.DRUG 
val nlpRDD = resultRDD.mapPartitions { iter => 
    val nlpEntities: NLPEntitiesDAO = new NLPEntitiesDAO 
    iter.map { 
    result => 
     val message = Bytes.toString(result.getValue(Bytes.toBytes("p"), Bytes.toBytes("message"))) 
     val row_key = Bytes.toString(result.getRow) 
     nlpEntities.setToken(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("token"))))) 
     nlpEntities.setSpaceToken(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("spaceToken"))))) 
     nlpEntities.setSentence(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("sentence"))))) 
     nlpEntities.setVG(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("verbGroup"))))) 
     nlpEntities.setSplit(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("split"))))) 
     nlpEntities.setNounChunk(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("gen"), Bytes.toBytes("nounChunk"))))) 

     nlpEntities.setDrugs(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("ner"), Bytes.toBytes("drug"))))) 
     nlpEntities.setRegimen(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("ner"), Bytes.toBytes("regimen"))))) 
     nlpEntities.setSideEffects(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("ner"), Bytes.toBytes("sideEffect"))))) 
     nlpEntities.setALT_DRUG(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("ner"), Bytes.toBytes("altDrug"))))) 
     nlpEntities.setALT_THERAPY(Utility.jsonToAnnotations(Bytes.toString(
     result.getValue(Bytes.toBytes("ner"), Bytes.toBytes("altTherapy"))))) 
     (row_key, message, nlpEntities) 
    } 
} 
val featureExtractionOld: SegmentsFeatureExtraction = new SegmentsFeatureExtraction(
    pluginHome, entity) 
val outputRDD = nlpRDD.mapPartitions { iter => 
    val featureExtraction: SegmentsFeatureExtraction = new SegmentsFeatureExtraction(
    pluginHome, entity) 
    iter.map { x => 
    val featuresJson = featureExtraction.generateFeatures(x._2, Utility.objectToJson(x._3)) 
    val segmentFeatures: SegmentFeaturesDAO = Utility.jsonToSegmentFeatures(featuresJson) 
    val documentEntities: DocumentEntitiesDAO = new DocumentEntitiesDAO 
    documentEntities.setSystemId(x._1) 
    documentEntities.setToken(x._3.getToken) 
    documentEntities.setSpaceToken(x._3.getSpaceToken) 
    documentEntities.setSentence(x._3.getSentence) 
    documentEntities.setVG(x._3.getVG) 
    documentEntities.setNounChunk(x._3.getNounChunk) 
    documentEntities.setSplit(x._3.getSplit) 
    documentEntities.setDRUG(x._3.getDrugs) 
    documentEntities.setSE(x._3.getSideEffects) 
    documentEntities.setREG(x._3.getRegimen) 
    documentEntities.setALT_DRUG(x._3.getALT_DRUG) 
    documentEntities.setALT_THERAPY(x._3.getALT_THERAPY) 
    documentEntities.setSegment(segmentFeatures.getSegment) 
    documentEntities.setSegmentClass(segmentFeatures.getSegmentClass) 
    documentEntities.setSegmentInstance(segmentFeatures.getSegmentInstance) 
    (x._1, documentEntities) 
    } 
} 
val newRDD = outputRDD.map { k => convertToPut(k) } 
newRDD.saveAsNewAPIHadoopDataset(job.getConfiguration()) 
    } 
     def convertToPut(NlpWithRowKey: (String, DocumentEntitiesDAO)): List[(ImmutableBytesWritable, Put)] = { 
    val rowkey = NlpWithRowKey._1 
val documentEntities = NlpWithRowKey._2 
var returnList: List[(ImmutableBytesWritable, Put)] = List() 
val segmentInstances = documentEntities.getSegmentInstance 
val segments = documentEntities.getSegment 
if(segments != null) { 
    var count = 0 
    for(segment <- segmentInstances) { 
    val keyString: String = documentEntities.getSystemId + "#" + Integer.toString(count) 
    count = count + 1 
    val outputKey: ImmutableBytesWritable = new ImmutableBytesWritable(keyString.getBytes()) 
    val put = new Put(outputKey.get()) 

    val features: FeatureMap = segment.getFeatures 
    val it: Iterator[Entry[Object, Object]] = features.entrySet.iterator() 
    var sideEffect_offset = "NULL" 
    var entity_offset = "NULL" 
     while(it.hasNext) { 
      val pair = it.next() 
     if(pair.getKey.equals("sideEffect-offset")) { 
      sideEffect_offset = pair.getValue().toString() 
     } 
     else if(pair.getKey.equals("drug-offset")) { 
      entity_offset = pair.getValue().toString() 
     } 
     else if(pair.getKey().equals("drug") || pair.getKey().equals("sideEffect")){ 
      put.add(Bytes.toBytes("seg"), Bytes.toBytes(pair.getKey.toString), Bytes 
      .toBytes(pair.getValue().toString)) 
     } 
     else { 
      put.add(Bytes.toBytes("segFeatures"), Bytes.toBytes(pair.getKey.toString), Bytes 
      .toBytes(pair.getValue().toString)) 
     } 
     } 
    put.add(Bytes.toBytes("seg"), Bytes.toBytes("RelationId"), 
     Bytes.toBytes(documentEntities.getSystemId() + "-" + entity_offset + "-" + sideEffect_offset)) 
    put.add(Bytes.toBytes("segInst"),Bytes.toBytes("id"), Bytes.toBytes(segment.getId())) 
    put.add(Bytes.toBytes("segInst"), Bytes.toBytes("type"), Bytes.toBytes(segment.getType())) 
    put.add(Bytes.toBytes("segInst"), Bytes.toBytes("startNodeId"), Bytes.toBytes(
     segment.getStartNode().getId())) 
    put.add(Bytes.toBytes("segInst"), Bytes.toBytes("startNodeOffset"), 
     Bytes.toBytes(segment.getStartNode().getOffset())) 
    put.add(Bytes.toBytes("segInst"),Bytes.toBytes("endNodeId"), 
     Bytes.toBytes(segment.getEndNode().getId())) 
    put.add(Bytes.toBytes("segInst"), Bytes.toBytes("endNodeOffset"), 
     Bytes.toBytes(segment.getEndNode().getOffset())) 

    put.add(Bytes.toBytes("seg"),Bytes.toBytes("system_id"), 
     Bytes.toBytes(documentEntities.getSystemId())) 
    put.add(Bytes.toBytes("seg"), Bytes.toBytes("segmentText"), 
     Bytes.toBytes(segment.getAnnotatedText())) 

    for(segmentClassAnnots <- documentEntities.getSegmentClass) { 
     try { 
     if (segment.getId().equals(segmentClassAnnots.getFeatures().get("instance-id"))) { 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("id"), 
      Bytes.toBytes(segmentClassAnnots.getId())) 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("type"), 
      Bytes.toBytes(segmentClassAnnots.getType())) 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("startNodeId"), Bytes 
      .toBytes(segmentClassAnnots.getStartNode() 
       .getId())) 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("startNodeOffset"), Bytes 
      .toBytes(segmentClassAnnots.getStartNode() 
       .getOffset())) 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("endNodeId"), Bytes 
      .toBytes(segmentClassAnnots.getEndNode() 
       .getId())) 
      put.add(Bytes.toBytes("segClass"), Bytes.toBytes("endNodeOffset"), Bytes 
      .toBytes(segmentClassAnnots.getEndNode() 
       .getOffset())) 
      break 
     } 
     } catch { 
     case t: Throwable => t.printStackTrace 
     } 
     returnList = returnList:+((new ImmutableBytesWritable(Bytes.toBytes("drugSegmentNew1")), put)) 
    } 
    } 
    } 
val PUT = new Put(Bytes.toBytes(rowkey)) 
PUT.add(Bytes.toBytes("f"), Bytes.toBytes("dStatus"), Bytes.toBytes("1")) 
returnList = returnList:+((new ImmutableBytesWritable(Bytes.toBytes("posts")), PUT)) 
(returnList) 
    } 
} 

回答

2

只要改变你的下面一行:

val newRDD = outputRDD.map { k => convertToPut(k) } 

这一行:

val newRDD = outputRDD.flatMap { k => convertToPut(k) } 

希望这有助于!

+0

工作与魅力:D – wadhwasahil

+0

但为什么** newRDD.flatten **在这里工作?它不会产生相同的结果吗? – wadhwasahil