2015-02-18 58 views
6

我正在为文本分类实现流学习器。在我的实现中有一些单值参数需要在新的流项目到达时更新。例如,我想在新的预测做出时改变学习速度。但是,我怀疑在最初的广播之后有没有办法广播变量。那么,如果我每次更新它时需要广播一个变量,会发生什么。如果有办法做到这一点,或者我想在Spark Streaming中完成的工作,我很乐意听到它。Apache Spark中的定期广播流式传输

在此先感谢。

回答

1

我的理解是一旦广播变量最初被发出,它是'只读'。我相信你可以更新本地节点上的广播变量,但不能在远程节点上。

可能你需要考虑在'Spark之外'这样做。如何使用noSQL存储(Cassandra ..等等)甚至是Memcache?然后,您可以从一项任务更新变量,并定期从其他任务中检查此店铺?

0

最好将数据收集到驱动程序,然后将它们广播到所有节点。

使用Dstream # foreachRDD收集驱动程序中计算的RDD,一旦知道何时需要更改学习速率,则使用SparkContext#broadcast(value)将新值发送到所有节点。

我希望的代码看起来像下面这样:

dStreamContainingBroadcastValue.foreachRDD{ rdd => 
     val valueToBroadcast = rdd.collect() 
     sc.broadcast(valueToBroadcast) 
} 

您也可以找到this thread有用的,从火花用户的邮件列表。让我知道这是否有效。

+0

那么,你如何看待他可以从该片段读取广播变量?它有点失败了返回'Unit'的目的。 – kareblak 2015-10-06 09:08:35

1

我得到了一个丑陋的戏,但它的工作! 我们可以找到如何从广播对象获取广播值。 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 只是通过广播ID。

因此,我通过相同的广播ID周期性地重播。

val broadcastFactory = new TorrentBroadcastFactory() 
broadcastFactory.unbroadcast(BroadcastId, true, true) 
// append some ids to initIds 
val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId) 

我可以从第一个广播值获得BroadcastId。

val ids = ssc.sparkContext.broadcast(initIds) 
// broadcast id 
val BroadcastId = broadcastIds.id 

然后工作人员使用ids作为正常的广播类型。

def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ??? 
1
bkc.unpersist(true) 
bkc.destroy() 
bkc = sc.broadcast(tableResultMap) 
bkv = bkc.value 

你可以试试这个,我不能保证是否有效

1

我通过在广播变量创建一个包装类,得到了这个工作。包装类的updateAndGet方法返回刷新的广播变量。我调用这个函数里面dStream.transform - >按星火文档

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

变换操作状态: “所提供的函数被调用在每批次间隔这允许你做的时间变化的RDD操作,即RDD操作,分区数量,广播变量可以批量之间变化“

BroadcastWrapper类的样子:

public class BroadcastWrapper { 
private Broadcast<ReferenceData> broadcastVar; 
private Date lastUpdatedAt = Calendar.getInstance().getTime(); 

private static BroadcastWrapper obj = new BroadcastWrapper(); 

private BroadcastWrapper(){} 

public static BroadcastWrapper getInstance() { 
     return obj; 
} 

public JavaSparkContext getSparkContext(SparkContext sc) { 
     JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); 
     return jsc; 
} 

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ 
     Date currentDate = Calendar.getInstance().getTime(); 
     long diff = currentDate.getTime()-lastUpdatedAt.getTime(); 
     if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms 
      if (var != null) 
       var.unpersist(); 
      lastUpdatedAt = new Date(System.currentTimeMillis()); 

      //Your logic to refresh 
      ReferenceData data = getRefData(); 

      var = getSparkContext(sparkContext).broadcast(data); 
     } 
     return var; 
} 
} 

你可以使用这个播出stream.transform方法变量updateAndGet功能,允许RDD-RDD转化

objectStream.transform(stream -> { 

    Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context()); 

/**Your code to manipulate stream **/ 
}); 

请参阅我的完整答案从这个pos:https://stackoverflow.com/a/41259333/3166245

何PE它有帮助