我下面的代码创建了6个输入DStreams,使用直接aproach从Kafka的6分区主题读取我发现,即使指定相同的组ID为流,我得到的数据重复6次。如果我只创建3 DStreams我得到的数据重复3次等等....Spark流。从卡夫卡并行读取正在导致重复的数据
numStreams = 6
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], {
"metadata.broker.list": brokers,
"fetch.message.max.bytes": "20971520",
"spark.streaming.blockInterval" : "2000ms",
"group.id" : "the-same"},
valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)]
kvs = ssc.union(*kafkaStreams)
我做错了的是什么?
@Doctor你试过每个主题有一个DStream吗?它现在对你有用吗? –