我们开发了一个火花流应用程序,它可以从kafka获取数据并写入mongoDB。在输入DStream中创建foreachRDD内部的连接时,我们注意到性能影响。插入到mongoDB之前,Spark应用程序会进行一些验证。我们正在探索避免为每个处理的消息连接到mongoDB的选项,而我们希望一次处理一个批处理间隔内的所有消息。以下是Spark流应用程序的简化版本。我们所做的一件事是将所有消息附加到数据框,并尝试在foreachRDD之外插入该数据框的内容。但是当我们运行这个应用程序时,将数据框写入mongoDB的代码不会被执行。避免与火花流传输的mongoDB多连接
请注意,我注释了foreachRDD中的一部分代码,我们用它将每条消息插入到mongoDB中。由于我们一次插入一条消息,因此现有方法非常缓慢。任何关于性能改进的建议都非常感谢。
谢谢
package com.testing
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.joda.time._
import org.joda.time.format._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON
import scala.io.Source._
import java.util.Properties
import java.util.Calendar
import scala.collection.immutable
import org.json4s.DefaultFormats
object Sample_Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Sample_Streaming")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val props = new HashMap[String, Object]()
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)
val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "connectionURI")
.option("spark.mongodb.input.collection", "schemaCollectionName")
.load()
val outSchema = schemaDf.schema
var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)
KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
{
val jsonInput: JValue = parse(x)
/*Do all the transformations using Json libraries*/
val json4s_transformed = "transformed json"
val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
val df = sqlContext.read.schema(outSchema).json(rdd)
//Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once
/* df.write.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()*/
outDf = outDf.union(df)
}
}
)
//Added this part of the code in expectation to access the unioned dataframe and insert all messages at once
//println(outDf.count())
if(outDf.count() > 0)
{
outDf.write
.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
我很困惑“我们希望一次处理DStream中的所有消息”。 DStreams是无限的......你的意思是一次处理一个批处理间隔内的所有消息吗? –
是的我打算一次处理一个批处理间隔内的所有消息。对不起,感到困惑 – Sid