2017-04-21 70 views
0

我有这样一段代码:试图了解火花流流

val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 
    lines.foreachRDD { rdd => 
     val df = cassandraSQLContext.read.json(rdd.map(x => x._2)) 
     sparkStreamingService.run(df) 
    } 
    ssc.start() 
    ssc.awaitTermination() 

我的理解是,foreachRDD在驱动程序级别发生的呢?所以基本上所有的代码块:

lines.foreachRDD { rdd => 
    val df = cassandraSQLContext.read.json(rdd.map(x => x._2)) 
    sparkStreamingService.run(df) 
} 

发生在驱动程序级别?所述sparkStreamingService.run(DF)方法基本上没有对电流数据帧一些转换,以产生一个新的数据帧,然后调用存储数据帧到另一个卡桑德拉方法(在另一个罐)。 因此,如果这种情况都发生在驱动程序级别,我们没有利用火花执行程序,我怎样才能做到这一点,以便并行使用执行程序来并行处理RDD的每个分区

我的火花流服务运行方法:

var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect() 
metadataDataframe.foreach(rowD => { 
     metaData = populateMetaDataService.populateSiteMetaData(rowD) 
     val headers = (rowD.getString(2).split(recordDelimiter)(0)) 

     val fields = headers.split("\u0001").map(
     fieldName => StructField(fieldName, StringType, nullable = true)) 
     val schema = StructType(fields) 

     val listOfRawData = rowD.getString(2).indexOf(recordDelimiter) 
     val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1) 

     val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter)) 
//  val rawData = dataWithoutHeaders.split(recordDelimiter) 
     val rowRDD = rawData 
     .map(_.split("\u0001")) 
     .map(attributes => Row(attributes: _*)) 

     val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema) 
     dataFrameFilterService.processBasedOnOpType(metaData, newDF) 
    }) 

回答

1

foreachRDD可以在本地运行,但只是意味着设置。 RDD本身是一个分布式集合,所以实际的工作是分布式的。

直接在代码的文档注释:

dstream.foreachRDD { rdd => 
    val connection = createNewConnection() // executed at the driver 
    rdd.foreach { record => 
    connection.send(record) // executed at the worker 
    } 
} 

请注意,这不是围绕RDD基于代码的部分在驱动程序执行。这是使用分发给工作人员的RDD建立的代码。

你的代码将专门被注释如下:

//df.select will be distributed, but collect will pull it all back in 
    var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect() 
//Since collect created a local collection then this is done on the driver 
metadataDataframe.foreach(rowD => { 
     metaData = populateMetaDataService.populateSiteMetaData(rowD) 
     val headers = (rowD.getString(2).split(recordDelimiter)(0)) 

     val fields = headers.split("\u0001").map(
     fieldName => StructField(fieldName, StringType, nullable = true)) 
     val schema = StructType(fields) 

     val listOfRawData = rowD.getString(2).indexOf(recordDelimiter) 
     val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1) 

     //This will run locally, creating a distributed record 
     val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter)) 
//  val rawData = dataWithoutHeaders.split(recordDelimiter) 
     //This will redistribute the work 
     val rowRDD = rawData 
     .map(_.split("\u0001")) 
     .map(attributes => Row(attributes: _*)) 
     //again, setting this up locally, to be run distributed 
     val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema) 
     dataFrameFilterService.processBasedOnOpType(metaData, newDF) 
    }) 

最终,你可能可以重写这并不需要收集和保存所有分布的,但这是你没有计算器

+0

但在这里:http://spark.apache.org/docs/latest/streaming-programming-guide.html,如果你向下滚动到他们使用forEachRdd的地方,他们有一个评论说,一个特定的声明正在以@Ahmed司机 – Ahmed

+0

执行我编辑它直接与比文档 –

+0

是,这样是我的问题增加了更多的明确性解决这一问题,就收集。到现在为止,由于我正在收集,记录是按顺序处理的,并且这些记录中的每一个都会分发给执行者?然后删除收集,所有的记录将被并行处理而不是顺序处理,是吗? – Ahmed

1

的调用foreachRDD确实发生在驱动程序节点上。但是,由于我们正在RDD级别运营,因此将对其进行分配。在你的例子中,rdd.map将导致每个分区被发送到特定的工作节点进行计算。

因为我们不知道你的sparkStreamingService.run方法是干什么的,我们不能告诉你关于其执行的地方。

+0

我添加了run方法的代码。这是最有效的方法吗?此方法被并行右:dataFrameFilterService.processBasedOnOpType(元数据,newDF)。我理解它的方式是我可以避免使用collect来加速进程,以便并行处理记录? – Ahmed