0
我有一个自定义的Spark流的foreach编写器。对于我写入JDBC源的每一行。在执行JDBC操作之前,我还想做一些快速查找并在执行JDBC操作之后更新值,如下面的示例代码中的“Step-1”和“Step-3”...在Apache Spark中存在内存数据库
I don不想使用REDIS,MongoDB等外部数据库。我想用低足迹像RocksDB,德比等什么......
我可以接受存储每个应用程序的一个文件,就像检查点,我将创建一个内部数据库文件夹...
我看不到任何内存数据库用于星火..
def main(args: Array[String]): Unit = {
val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"
val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();
val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;
val kafkaDataframe = sparkSession.readStream.format("kafka")
.options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
"startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
.load()
kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")
val customForEachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = {
println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
true
}
override def process(value: Row) = {
// Step 1 ==> Lookup a key in persistent KEY-VALUE store
// JDBC operations
// Step 3 ==> Update the value in persistent KEY-VALUE store
}
override def close(errorOrNull: Throwable) = {
println(" ************** Closed ****************** ")
}
}
val yy = sqlDataframe
.writeStream
.queryName("foreachquery")
.foreach(customForEachWriter)
.start()
yy.awaitTermination()
sparkSession.close();
}
你问的https://db.apache。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html?我真的不知道什么是“永久性内存数据库”,除非您正在讨论使用像NVM这样的硬件?如果没有特殊的硬件,Derby内存数据库不会持久。 –
我的意思是在内存中意味着.. MySQL,Redis作为单独的进程运行......我不想要......德比加载到火花驱动程序中,并从执行者我想连接到德比......因为我的火花由纱线运行的作业将在5台机器上..所以我可以使用德比是火花......并将它用于我的需要步骤1和3 ...但是不支持MVCC,所以我正在考虑H2数据库...所以我想体验一下使用Derby和H2是Spark – Manjesh
OK。 Derby关于这个“进程内”数据库引擎的术语是“嵌入式”的,对于将Derby嵌入到另一个(Java)应用程序中来说,它的效果很好。 Derby不是MVCC数据库引擎是正确的。要开始使用Derby,我推荐以下教程:https://db.apache.org/derby/docs/10.13/getstart/ –