2017-08-31 181 views
0

我正在处理数据与斯卡拉2.11.7和Flink 1.3.2。现在我想将生成的org.apache.flink.api.scala.DataSet存储在neo4j图形数据库中。neo4j与Flink和斯卡拉

有Github上项目的兼容性:

  • 弗林克用的Neo4j:https://github.com/s1ck/flink-neo4j
  • 斯卡拉与Neo4j的:_HTTPS://github.com/FaKod/neo4j-scala
  • 弗林克的图形库“冻膜“与neo4j:_https://github.com/albertodelazzari/gelly-neo4j

什么是最有希望的方式去?或者我应该更好地直接使用neo4j的REST API?

(BTW:为什么计算器限制链接postet的数量...?)

我试图弗林克-Neo4j的,但似乎有一些问题与混合Java和Scala类:

package dummy.neo4j 

import org.apache.flink.api.common.io.OutputFormat 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.{Tuple, Tuple2} 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish 

    val tuple1: Tuple = new Tuple2("abc", 1) 
    val tuple2: Tuple = new Tuple2("def", 2) 

    val test = env.fromElements[Tuple](tuple1, tuple2) 
    println("test: " + test.getClass) 
    test.output(outputFormat) 
    } 

} 

线程“main”中的异常java.lang.ClassCastException:[Ljava.lang.Object;不能转换为[Lorg.apache.flink.api.common.typeinfo.TypeInformation; 在dummy.neo4j.Neo4jDummyWriter $。主要(Neo4jDummyWriter.scala:20) 在dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)

类型不匹配,预计:OUTPUTFORMAT [元组],则实际:OUTPUTFORMAT [_ <:元组]

回答

0

不改变Tuple2对象的溶液到元组:

package dummy.neo4j 

import org.apache.flink.api.common.io._ 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.Tuple2 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val tuple1 = ("user9", 1978) 
    val tuple2 = ("user10", 1996) 
    val datasetWithScalaTuples = env.fromElements(tuple1, tuple2) 
    val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2)) 

    val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]] 

    dataset.output(outputFormat) 
    env.execute 
    } 

}