2017-09-15 57 views
0

我有一个自定义的Spark流的foreach编写器。对于我写入JDBC源的每一行。在执行JDBC操作之前,我还想做一些快速查找并在执行JDBC操作之后更新值,如下面的示例代码中的“Step-1”和“Step-3”...在Apache Spark中存在内存数据库

I don不想使用REDIS,MongoDB等外部数据库。我想用低足迹像RocksDB,德比等什么......

我可以接受存储每个应用程序的一个文件,就像检查点,我将创建一个内部数据库文件夹...

我看不到任何内存数据库用于星火..

def main(args: Array[String]): Unit = { 

val brokers = "quickstart:9092" 
val topic = "safe_message_landing_app_4" 

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate(); 

val sparkContext = sparkSession.sparkContext; 
sparkContext.setLogLevel("ERROR") 
val sqlContext = sparkSession.sqlContext; 

val kafkaDataframe = sparkSession.readStream.format("kafka") 
    .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic, 
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader")) 
    .load() 

kafkaDataframe.printSchema() 
kafkaDataframe.createOrReplaceTempView("kafka_view") 
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view") 

val customForEachWriter = new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long) = { 
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version) 
    true 
    } 

    override def process(value: Row) = { 
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store 

    // JDBC operations 

    // Step 3 ==> Update the value in persistent KEY-VALUE store 
    } 

    override def close(errorOrNull: Throwable) = { 
    println(" ************** Closed ****************** ") 
    } 
} 

val yy = sqlDataframe 
    .writeStream 
    .queryName("foreachquery") 
    .foreach(customForEachWriter) 
    .start() 

yy.awaitTermination() 

sparkSession.close(); 

}

+1

你问的https://db.apache。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html?我真的不知道什么是“永久性内存数据库”,除非您正在讨论使用像NVM这样的硬件?如果没有特殊的硬件,Derby内存数据库不会持久。 –

+1

我的意思是在内存中意味着.. MySQL,Redis作为单独的进程运行......我不想要......德比加载到火花驱动程序中,并从执行者我想连接到德比......因为我的火花由纱线运行的作业将在5台机器上..所以我可以使用德比是火花......并将它用于我的需要步骤1和3 ...但是不支持MVCC,所以我正在考虑H2数据库...所以我想体验一下使用Derby和H2是Spark – Manjesh

+2

OK。 Derby关于这个“进程内”数据库引擎的术语是“嵌入式”的,对于将Derby嵌入到另一个(Java)应用程序中来说,它的效果很好。 Derby不是MVCC数据库引擎是正确的。要开始使用Derby,我推荐以下教程:https://db.apache.org/derby/docs/10.13/getstart/ –

回答

1

Manjesh,

你在找什么,“星火和你的内存数据库作为一个无缝集群,共享单个进程空间“,支持MVCC正是SnappyData所提供的。借助SnappyData,您希望快速查找的表格与正在运行Spark流式作业的过程处于同一个过程中。检查出来here

SnappyData拥有核心产品的Apache V2许可证,并且您所指的特定用途可在OSS下载中找到。

(披露:我是一个SnappyData员工,是有意义的提供产品具体回答这个问题,因为该产品是问题的答案)