我正在处理数据与斯卡拉2.11.7和Flink 1.3.2。现在我想将生成的org.apache.flink.api.scala.DataSet存储在neo4j图形数据库中。neo4j与Flink和斯卡拉
有Github上项目的兼容性:
- 弗林克用的Neo4j:https://github.com/s1ck/flink-neo4j
- 斯卡拉与Neo4j的:_HTTPS://github.com/FaKod/neo4j-scala
- 弗林克的图形库“冻膜“与neo4j:_https://github.com/albertodelazzari/gelly-neo4j
什么是最有希望的方式去?或者我应该更好地直接使用neo4j的REST API?
(BTW:为什么计算器限制链接postet的数量...?)
我试图弗林克-Neo4j的,但似乎有一些问题与混合Java和Scala类:
package dummy.neo4j
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.{Tuple, Tuple2}
import org.apache.flink.api.scala._
object Neo4jDummyWriter {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/")
.setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
.addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish
val tuple1: Tuple = new Tuple2("abc", 1)
val tuple2: Tuple = new Tuple2("def", 2)
val test = env.fromElements[Tuple](tuple1, tuple2)
println("test: " + test.getClass)
test.output(outputFormat)
}
}
线程“main”中的异常java.lang.ClassCastException:[Ljava.lang.Object;不能转换为[Lorg.apache.flink.api.common.typeinfo.TypeInformation; 在dummy.neo4j.Neo4jDummyWriter $。主要(Neo4jDummyWriter.scala:20) 在dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)
和
类型不匹配,预计:OUTPUTFORMAT [元组],则实际:OUTPUTFORMAT [_ <:元组]