2

我有一个火花流式传输用例,我计划在每个执行器上广播和缓存数据集。流中的每个微批将从RDD创建一个数据框并加入批处理。我下面给出的测试代码将执行每批的广播操作。有没有办法只播一次?与广播联接的火花流式传输

val testDF = sqlContext.read.format("com.databricks.spark.csv") 
       .schema(schema).load("file:///shared/data/test-data.txt") 

val lines = ssc.socketTextStream("DevNode", 9999) 

lines.foreachRDD((rdd, timestamp) => { 
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() 
    val resultDF = recordDF.join(broadcast(testDF), "Age") 
    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) 
    } 

对于每个批次都读取该文件并进行广播。

16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 

16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 

对广播数据集的任何建议只有一次?

回答

0

它看起来像现在广播的表不被重复使用。请参阅:SPARK-3863

对广播外foreachRDD环:

val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv") 
.schema(schema).load(...)) 

lines.foreachRDD((rdd, timestamp) => { 
    val recordDF = ??? 
    val resultDF = recordDF.join(testDF, "Age") 
    resultDF.write.format("com.databricks.spark.csv").save(...) 
} 

+0

我试过这种方法也和预期不会播出。可能是因为foreachRDD是在司机的情况下执行的。顺便说一句,我们必须在join语句中使用testDF.value。我认为这是一个错字。 – Cheeko

+0

谢谢! 请注意,sc.broadcast(someDataFrame)会在播放之前将数据传送给驱动程序,还是会执行每个执行程序的bittorrent风格广播?我总是使用SQL的广播提示。不知道有什么区别。 – Cheeko

+0

这不起作用。我的意思是你可以播放一个'DataFrame'(毕竟它是'Serializable'),但是你不能在DDS上嵌套操作。你可以简单地收集,转换成比'Array [Row]'更有用的东西,并广播本地数据结构。然后只需使用UDF。 – zero323