2
我有一个火花流式传输用例,我计划在每个执行器上广播和缓存数据集。流中的每个微批将从RDD创建一个数据框并加入批处理。我下面给出的测试代码将执行每批的广播操作。有没有办法只播一次?与广播联接的火花流式传输
val testDF = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
val resultDF = recordDF.join(broadcast(testDF), "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
对于每个批次都读取该文件并进行广播。
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
对广播数据集的任何建议只有一次?
我试过这种方法也和预期不会播出。可能是因为foreachRDD是在司机的情况下执行的。顺便说一句,我们必须在join语句中使用testDF.value。我认为这是一个错字。 – Cheeko
谢谢! 请注意,sc.broadcast(someDataFrame)会在播放之前将数据传送给驱动程序,还是会执行每个执行程序的bittorrent风格广播?我总是使用SQL的广播提示。不知道有什么区别。 – Cheeko
这不起作用。我的意思是你可以播放一个'DataFrame'(毕竟它是'Serializable'),但是你不能在DDS上嵌套操作。你可以简单地收集,转换成比'Array [Row]'更有用的东西,并广播本地数据结构。然后只需使用UDF。 – zero323