2017-08-29 306 views
2

我是spark和kafka的新手,我与kafka具有略微不同的spark spark使用模式。 我使用订阅相同kafka主题的Spark流应用程序

spark-core_2.10 - 2.1.1 
spark-streaming_2.10 - 2.1.1 
spark-streaming-kafka-0-10_2.10 - 2.0.0 
kafka_2.10 - 0.10.1.1 

连续的事件数据被传输到卡夫卡的话题,我需要从多个火花流应用程序处理。但是当我运行火花流应用程序时,只有其中一个接收数据。

 Map<String, Object> kafkaParams = new HashMap<String, Object>(); 

    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("group.id", "test-consumer-group"); 
    kafkaParams.put("enable.auto.commit", "true"); 
    kafkaParams.put("auto.commit.interval.ms", "1000"); 
    kafkaParams.put("session.timeout.ms", "30000"); 

    Collection<String> topics = Arrays.asList("4908100105999_000005");; 
    JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
        ssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); 

     ... //spark processing 

我有两个火花流应用程序,通常我提交的第一个应用程序使用kafka消息。第二个应用程序只是等待消息,永远不会进行。 当我阅读时,卡夫卡主题可以从多个消费者订阅,对于火花流媒体来说是不是真的?或者有什么我缺少kafka主题及其配置?

在此先感谢。

回答

0

您可以使用相同的groupid创建不同的流。下面是从0.8集成的在线文档的详细信息,有两种做法:

方法1:基于接收器的方法

多卡夫卡输入DStreams可以用不同的组创建和 议题使用多个接收器并行接收数据。

方法二:直接法(没有接收者)

无需创建多个输入流卡夫卡和工会它们。使用 directStream,Spark Streaming将创建与 一样多的RDD分区,并使用卡夫卡分区,这将全部从 卡夫卡中并行读取数据。因此,卡夫卡和RDD分区之间存在一对一的映射关系,这更易于理解和调整。

就像你正在使用0.10你可以阅读更多的Spark Streaming + Kafka Integration Guide 0.8

从您的代码如下,请参考Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0

甚至认为它是使用火花流API,一切都是卡夫卡性能控制,取决于您在属性文件中指定的组ID,您可以启动具有不同组ID的多个流。

干杯!

+1

我在两个消费者中都使用了相同的组ID,因此只有一个消费者正在接收消息。具有不同group.id的消费者订阅同一主题,并行/分别接收消息。 – Gurubg

+0

是的,如果您使用相同的组ID,那么只有一个会收到该消息。 –

1

消费者数量[在消费群体下],不能超过主题中的分区数量。如果您想要并行使用这些消息,那么您需要引入适当数量的分区并创建接收器来处理每个分区。

+0

让两个消费群体在同一消费群体下拥有两个分区有什么区别? – Gurubg

+0

我的意思是卡夫卡分区。如果您的Kafka主题中有两个分区,并且想要并行处理这些消息,则可以引入一组消费者[该消费者组中的消费者数量不应超过正在使用的主题中的分区数。]消费者组由消费者组ID标识。如果两个消费者群体具有相同的群组ID,那么Kafka会假定这两个消费群体都是一个群体。如果您的应用程序使用相同的代码,则尝试更改第二个应用程序的kafkaParams.put(“group.id”,“test-consumer-group1”)。 –