2015-07-12 179 views
3

我想用Flink-HBase插件读出数据,然后将其作为Flink机器学习算法(分别为SVM和MLR)的输入。现在我首先将提取的数据写入临时文件,然后通过libSVM方法读取它,但我想应该有更复杂的方法。用于机器学习算法的Flink HBase输入

你有代码片段或想法如何做?

+0

Flink是比较新的项目。我想,你可能会在flink邮件列表上得到更好的帮助。 –

回答

3

无需将数据写入磁盘,然后使用MLUtils.readLibSVM进行读取。原因如下。

MLUtils.readLibSVM需要一个文本文件,其中每一行都是稀疏特征矢量及其相关标签。它使用以下格式表示标签特征向量对:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info> 

<feature>是后续value的在特征向量的索引。 MLUtils.readLibSVM可以读取具有此格式的文件并转换LabeledVector实例中的每一行。因此,您在读取libSVM文件后获得DataSet[LabeledVector]。这正是SVMMultipleLinearRegression预测器所需的输入格式。

但是,根据您从HBase获得的数据格式,首先必须将数据转换为libSVM格式。否则,MLUtils.readLibSVM将无法​​读取写入的文件。如果您转换数据,那么您还可以直接将数据转换为DataSet[LabeledVector],并将其用作Flink ML算法的输入。这可以避免不必要的磁盘循环。

如果从HBase的一个DataSet[String]其中每个字符串具有libSVM格式(参见上面的说明书中)获得,然后可以在HBase的DataSet施加map操作用下面的映射函数。

val hbaseInput: DataSet[String] = ... 
val labelCOODS = hbaseInput.flatMap { 
    line => 
    // remove all comments which start with a '#' 
    val commentFreeLine = line.takeWhile(_ != '#').trim 

    if(commentFreeLine.nonEmpty) { 
     val splits = commentFreeLine.split(' ') 
     val label = splits.head.toDouble 
     val sparseFeatures = splits.tail 
     val coos = sparseFeatures.map { 
     str => 
      val pair = str.split(':') 
      require(
      pair.length == 2, 
      "Each feature entry has to have the form <feature>:<value>") 

      // libSVM index is 1-based, but we expect it to be 0-based 
      val index = pair(0).toInt - 1 
      val value = pair(1).toDouble 

      (index, value) 
     } 

     Some((label, coos)) 
    } else { 
     None 
    } 

// Calculate maximum dimension of vectors 
val dimensionDS = labelCOODS.map { 
    labelCOO => 
    labelCOO._2.map(_._1 + 1).max 
}.reduce(scala.math.max(_, _)) 

val labeledVectors: DataSet[LabeledVector] = 
    labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] { 
    var dimension = 0 

    override def open(configuration: Configuration): Unit = { 
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0) 
    } 

    override def map(value: (Double, Array[(Int, Double)])): LabeledVector = { 
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2)) 
    } 
}}.withBroadcastSet(dimensionDS, DIMENSION) 

这会将您的libSVM格式数据转换为LabeledVectors的数据集。

+0

谢谢!你的回答非常有帮助! 不幸的是,HBase中的数据集必须在Java类中获得,现在我得到的错误是我的DataSet与Scala类中的方法不兼容: 错误:(102,29)java:incompatible types: 'org.apache.flink.api.java.DataSet 不能转换为org.apache.flink.api.scala.DataSet '' – MsIcklerly

+0

你应该也可以使用Scala API从HBase读取。然后你获得一个'org.apache.flink.api.scala.Dataset [String]'。 –