2017-08-11 154 views
1

我必须计算DStream中元组的数量,并且根据值,我必须修改布尔变量的值。 不幸的是,我所做的并不是分配新值。 这是代码:将变量值赋值给Spark Streaming Scala

val teon = false 

s1.foreachRDD(rdd => { 
System.out.println("# events = " + rdd.count()) 
    if (rdd.count().>(1000)) 
    teon.equals(true) 
    else 
    teon.equals(false) 
}) 

if(teon){ 
val ton2 = s2.map { x => x.sensor_name } 
ton2.print 
} 
else { 
    val ton3 = s2.map { x => x.stt.spatial.unit } 
    ton3.print 
} 

s1s2是DSTREAM [传感器](传感器是一个自定义类)。

我在哪里错了?

感谢

+0

什么是这里的用意何在?什么是's1'和's2'?一个基本的错误是'teon'是一个值而不是一个变量,但将其改为'var'不会达到预期的结果。该方法需要改变。 – maasg

回答

1

有手头两个问题在此代码:

第一个是teon被声明为val。它是不可变的,因此,它的价值在程序的执行过程中永远不会改变。

第二个问题是结构性问题。在DStream水平声明的变换,如:

if(teon){ 
val ton2 = s2.map { x => x.sensor_name } 
ton2.print 
} 

将只计算一次时首先被加载并添加到DSTREAM变换DAG为执行该程序。让我们记住DStream编程模型是基于streamingContext启动时应用的转换。这些步骤将基于teon的初始值定义单个转换路径,并且之后不会改变。

因为我们想使基于价值动态选择流中包含,我们需要采取内这些决定一个DStream操作的上下文。

考虑到这,代码应该是这样的:

var teon = false 

s1.foreachRDD{ rdd => 
    val count = rdd.count // compute it only once! 
    System.out.println("# events = " + count) 
    teon = count > 1000 // use the boolean value directly 
} 

s2.foreachRDD { rdd => 
    val ton = if (teon) { 
    rdd.map(x => x.sensor_name) 
    } else { 
    rdd.map(x => x.stt.spatial.unit) // I'm assuming here that sensor_name and _stt.spatial_unit are the same type. 
    } 
    ton.take(10).foreach(e => println(e)) // implement DStream.print "by hand" 
} 
1
val teon = false  
... 
...  
...  
if(teon) 

它没有意义。
val意味着您无法更改变量的值(这与Java中的final一样)。所以它总是会是假的。
如果你想改变你需要使用的值:

var teon = false