1
关于我的火花流程程序流程的小问题。我的火花流程程序的流程
我有这样的功能:
里面居然拆一个“好”消息分为多个字符串,并且,如果字符串是“坏”,返回一个空序列。
我读从卡夫卡主题的消息,我想解析的结果发送到两个不同的主题: 如果消息是“好”,发送解析主题“good_msg_topic的结果“ 如果消息是‘坏’,发送‘坏’消息主题‘bad_msg_topic’
为了实现这个目标,我这样做:
stream.foreachRDD(rdd => {
val res = rdd.map(msg => msg.value() -> parse(msg.value()))
res.foreach(pair => {
if (pair._2.isEmpty) {
producer.send(junkTopic, pair._1)
} else {
pair._2.foreach(m => producer.send(splitTopic, m))
}
})
})
不过,我觉得这是不是最佳。使用映射对象将原始消息关联到结果可能会减慢过程...
我以Spark和Scala开始,所以我认为可以做得更好。
关于如何改善这一点的任何想法?如果您认为它更好,也可以更改解析函数的签名。
谢谢
*使用映射对象将原始消息关联到结果可能会减慢进程*。你究竟在担心什么?你衡量了性能,发现这是一个瓶颈? –