2017-08-01 61 views
0

我们开发了一个火花流应用程序,它可以从kafka获取数据并写入mongoDB。在输入DStream中创建foreachRDD内部的连接时,我们注意到性能影响。插入到mongoDB之前,Spark应用程序会进行一些验证。我们正在探索避免为每个处理的消息连接到mongoDB的选项,而我们希望一次处理一个批处理间隔内的所有消息。以下是Spark流应用程序的简化版本。我们所做的一件事是将所有消息附加到数据框,并尝试在foreachRDD之外插入该数据框的内容。但是当我们运行这个应用程序时,将数据框写入mongoDB的代码不会被执行。避免与火花流传输的mongoDB多连接

请注意,我注释了foreachRDD中的一部分代码,我们用它将每条消息插入到mongoDB中。由于我们一次插入一条消息,因此现有方法非常缓慢。任何关于性能改进的建议都非常感谢。

谢谢

package com.testing 

import org.apache.spark.streaming._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame } 
import java.util.HashMap 
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 

import org.joda.time._ 
import org.joda.time.format._ 

import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 
import com.mongodb.util.JSON 

import scala.io.Source._ 
import java.util.Properties 
import java.util.Calendar 

import scala.collection.immutable 
import org.json4s.DefaultFormats 


object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
     var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema) 

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x => 
     { 
     val jsonInput: JValue = parse(x) 


     /*Do all the transformations using Json libraries*/ 

     val json4s_transformed = "transformed json" 

     val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
     val df = sqlContext.read.schema(outSchema).json(rdd) 

//Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once  
/*  df.write.option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save()*/ 
     outDf = outDf.union(df) 

     } 

    } 

    ) 


     //Added this part of the code in expectation to access the unioned dataframe and insert all messages at once 
     //println(outDf.count()) 
     if(outDf.count() > 0) 
     { 
     outDf.write 
        .option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save() 
     } 


    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 
+1

我很困惑“我们希望一次处理DStream中的所有消息”。 DStreams是无限的......你的意思是一次处理一个批处理间隔内的所有消息吗? –

+0

是的我打算一次处理一个批处理间隔内的所有消息。对不起,感到困惑 – Sid

回答

1

这听起来像你想连接的数量减少到mongodb,为了这个目的,你必须使用foreachPartition代码,当你成为连接做的MongoDB看到spec,代码将看起来像这样:

rdd.repartition(1).foreachPartition { 
    //get instance of connection 
    //write/read with batch to mongo 
    //close connection 
} 
+0

感谢您的输入。我会试试看。 – Sid