我正在为文本分类实现流学习器。在我的实现中有一些单值参数需要在新的流项目到达时更新。例如,我想在新的预测做出时改变学习速度。但是,我怀疑在最初的广播之后有没有办法广播变量。那么,如果我每次更新它时需要广播一个变量,会发生什么。如果有办法做到这一点,或者我想在Spark Streaming中完成的工作,我很乐意听到它。Apache Spark中的定期广播流式传输
在此先感谢。
我正在为文本分类实现流学习器。在我的实现中有一些单值参数需要在新的流项目到达时更新。例如,我想在新的预测做出时改变学习速度。但是,我怀疑在最初的广播之后有没有办法广播变量。那么,如果我每次更新它时需要广播一个变量,会发生什么。如果有办法做到这一点,或者我想在Spark Streaming中完成的工作,我很乐意听到它。Apache Spark中的定期广播流式传输
在此先感谢。
我的理解是一旦广播变量最初被发出,它是'只读'。我相信你可以更新本地节点上的广播变量,但不能在远程节点上。
可能你需要考虑在'Spark之外'这样做。如何使用noSQL存储(Cassandra ..等等)甚至是Memcache?然后,您可以从一项任务更新变量,并定期从其他任务中检查此店铺?
最好将数据收集到驱动程序,然后将它们广播到所有节点。
使用Dstream # foreachRDD收集驱动程序中计算的RDD,一旦知道何时需要更改学习速率,则使用SparkContext#broadcast(value)将新值发送到所有节点。
我希望的代码看起来像下面这样:
dStreamContainingBroadcastValue.foreachRDD{ rdd =>
val valueToBroadcast = rdd.collect()
sc.broadcast(valueToBroadcast)
}
您也可以找到this thread有用的,从火花用户的邮件列表。让我知道这是否有效。
我得到了一个丑陋的戏,但它的工作! 我们可以找到如何从广播对象获取广播值。 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 只是通过广播ID。
因此,我通过相同的广播ID周期性地重播。
val broadcastFactory = new TorrentBroadcastFactory()
broadcastFactory.unbroadcast(BroadcastId, true, true)
// append some ids to initIds
val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId)
我可以从第一个广播值获得BroadcastId。
val ids = ssc.sparkContext.broadcast(initIds)
// broadcast id
val BroadcastId = broadcastIds.id
然后工作人员使用ids作为正常的广播类型。
def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ???
bkc.unpersist(true)
bkc.destroy()
bkc = sc.broadcast(tableResultMap)
bkv = bkc.value
你可以试试这个,我不能保证是否有效
我通过在广播变量创建一个包装类,得到了这个工作。包装类的updateAndGet方法返回刷新的广播变量。我调用这个函数里面dStream.transform - >按星火文档
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
变换操作状态: “所提供的函数被调用在每批次间隔这允许你做的时间变化的RDD操作,即RDD操作,分区数量,广播变量等可以批量之间变化“
BroadcastWrapper类的样子:
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
你可以使用这个播出stream.transform方法变量updateAndGet功能,允许RDD-RDD转化
objectStream.transform(stream -> {
Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());
/**Your code to manipulate stream **/
});
请参阅我的完整答案从这个pos:https://stackoverflow.com/a/41259333/3166245
何PE它有帮助
那么,你如何看待他可以从该片段读取广播变量?它有点失败了返回'Unit'的目的。 – kareblak 2015-10-06 09:08:35