2016-11-22 68 views
3

我下面的代码创建了6个输入DStreams,使用直接aproach从Kafka的6分区主题读取我发现,即使指定相同的组ID为流,我得到的数据重复6次。如果我只创建3 DStreams我得到的数据重复3次等等....Spark流。从卡夫卡并行读取正在导致重复的数据

numStreams = 6 
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], { 
    "metadata.broker.list": brokers, 
    "fetch.message.max.bytes": "20971520", 
    "spark.streaming.blockInterval" : "2000ms", 
    "group.id" : "the-same"}, 
    valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)] 

kvs = ssc.union(*kafkaStreams) 

我做错了的是什么?

回答

1

在直接的方法中,你不应该从一个主题创建许多DStreams。

documentation

简化并行:无需创建多个输入流卡夫卡和 工会它们。使用directStream,Spark Streaming将创建多个 RDD分区,因为要使用Kafka分区,而 将全部从Kafka并行读取数据。因此,Kafka和RDD分区之间存在一对一的映射关系 ,这更容易理解并且调整为 。

所以只需创建一个DSTREAM,星火将使用所有卡夫卡分区:)

+0

@Doctor你试过每个主题有一个DStream吗?它现在对你有用吗? –

1

我对Python并不熟悉,但Spark Scala中的Direct Stream没有提交任何偏移量。因此,如果您在没有提交任何已读消息的偏移量的情况下打开流n次,您的消费者将从头开始。

如果它在python中是一样的,则不需要启动n个流。启动一个流,Spark将处理分区分配给执行者/任务本身。

1

基本上卡夫卡主题分份,使分布更快多个接收器/消费者分享当过创建DSTREAM的load.By默认一个接收器将运行并通过接收器线程(Java线程)并行接收来自每个Kafka主题分区的数据到Dstream分区。如果您为一个主题创建6个Dstream,则表示针对相同主题的6个接收者这并不意味着每个Dstream针对每个Portition。每个接收器每次获得一次馈送,因此每次馈送都获得6次。