2017-04-12 113 views
0

我目前正在写一个Spark Streaming。我的任务非常简单,只需接收来自kafka的json消息并进行一些文本过滤(包含TEXT1,TEXT2,TEXT3,TEXT4)。代码看起来像:为什么我的Spark Streaming程序处理速度如此之慢?

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


    messages.foreachRDD { rdd => 

     val originrdd = rdd.count() 

     val record = rdd.map(_._2).filter(x=>x.contains(TEXT1)).filter(x=>x.contains(TEXT2)).filter(x=>x.contains(TEXT3)).filter(x=>x.contains(TEXT4)) 

     val afterrdd = record.count() 

     println("original number of record: ", originrdd) 
     println("after filtering number of records:", afterrdd) 
} 

它是为每个JSON消息大约4kb的,并且从围绕卡夫卡50000记录每1秒。

对于上述任务,处理时间每个批次需要3秒,因此无法实现实时性能。我面临同样的任务风暴,并且执行速度更快。

+0

顺便说一句,Spark是微批,而不是实时 –

+0

那么,如果Spark实时无法处理,那么Spark流的优势是什么? – lserlohn

+0

你有多少卡夫卡? Kafka中的分区数量= Spark中的分区数量,一切都可能在一个分区中处理 –

回答

1

那么,在这个过程中,你已经做了3次不必要的RDD。

val record = rdd.map(_._2).filter(x => { 
    x.contains(TEXT1) && 
    x.contains(TEXT2) && 
    x.contains(TEXT3) && 
    x.contains(TEXT4) 
} 

另外值得一读。 https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

+0

谢谢,我也尝试了以上,我发现性能非常接近。假设过滤标准低,大部分记录将被过滤出来在这种情况下,每个过滤器操作实际上会减小rdd的大小,以便下一个过滤器操作可以在更小的记录大小上工作,从而减少处理时间。 – lserlohn

相关问题