2017-07-07 58 views
1

关于我的火花流程程序流程的小问题。我的火花流程程序的流程

我有这样的功能:

​​

里面居然拆一个“好”消息分为多个字符串,并且,如果字符串是“坏”,返回一个空序列。

我读从卡夫卡主题的消息,我想解析的结果发送到两个不同的主题: 如果消息是“好”,发送解析主题“good_msg_topic的结果“ 如果消息是‘坏’,发送‘坏’消息主题‘bad_msg_topic’

为了实现这个目标,我这样做:

stream.foreachRDD(rdd => { 
    val res = rdd.map(msg => msg.value() -> parse(msg.value())) 

    res.foreach(pair => { 
    if (pair._2.isEmpty) { 
     producer.send(junkTopic, pair._1) 
    } else { 
     pair._2.foreach(m => producer.send(splitTopic, m)) 
    } 
    }) 
}) 

不过,我觉得这是不是最佳。使用映射对象将原始消息关联到结果可能会减慢过程...

我以Spark和Scala开始,所以我认为可以做得更好。

关于如何改善这一点的任何想法?如果您认为它更好,也可以更改解析函数的签名。

谢谢

+1

*使用映射对象将原始消息关联到结果可能会减慢进程*。你究竟在担心什么?你衡量了性能,发现这是一个瓶颈? –

回答

3

如果您尚未测量,发现这个瓶颈,我也不会太在意有关性能。

有一件事我能想到的可能使代码更清晰的是使用ADT来形容结果类型:

sealed trait Result 
case class GoodResult(seq: Seq[String]) extends Result 
case class BadResult(original: String) extends Result 

parseResult

def parse(s: String): Result 

然后用mapDStream而不是RDD

stream 
.map(msg => parse(msg.value()) 
.foreachRDD { rdd => 
    rdd.foreach { result => 
    result match { 
     case GoodResult(seq) => seq.foreach(value => producer.send(splitTopic, value)) 
     case BadResult(original) => producer.send(junkTopic, original) 
    } 
    } 
}