2017-02-26 88 views
3

我有一个独立的Spark集群正在读取来自kafka队列的数据。 kafka队列有5个分区,spark只处理其中一个分区的数据。我使用的是以下情况:Kafka - Spark Streaming - 仅从一个分区读取数据

这里是我的Maven依赖:

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>kafka-custom</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.1</version> 
    </dependency> 

我卡夫卡生产者是一个简单的生产,这只是把100级的消息队列:

public void generateMessages() { 

    // Define the properties for the Kafka Connection 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", kafkaBrokerServer); // kafka server 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 

    // Create a KafkaProducer using the Kafka Connection properties 
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
      props); 
    for (int i = 0; i < 100; i++) { 
     ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopic, "value-" + i); 
     producer.send(record); 
    } 
    producer.close(); 

} 

这里我的火花流作业中的主代码:

public void processKafka() throws InterruptedException { 
    LOG.info("************ SparkStreamingKafka.processKafka start"); 

    // Create the spark application 
    SparkConf sparkConf = new SparkConf(); 
    sparkConf.set("spark.executor.cores", "5"); 

    //To express any Spark Streaming computation, a StreamingContext object needs to be created. 
    //This object serves as the main entry point for all Spark Streaming functionality. 
    //This creates the spark streaming context with a 'numSeconds' second batch size 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval)); 


    //List of parameters 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", this.getBrokerList()); 
    kafkaParams.put("client.id", "SpliceSpark"); 
    kafkaParams.put("group.id", "mynewgroup"); 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("enable.auto.commit", false); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 

    List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>(); 
    for(int i=0; i<5; i++) { 
     topicPartitions.add(new TopicPartition("mytopic", i)); 
    } 


    //List of kafka topics to process 
    Collection<String> topics = Arrays.asList(this.getTopicList().split(",")); 


    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
      jssc, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
     ); 

    //Another version of an attempt 
    /* 
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams) 
    ); 
    */ 

    messages.foreachRDD(new PrintRDDDetails()); 


    // Start running the job to receive and transform the data 
    jssc.start(); 

    //Allows the current thread to wait for the termination of the context by stop() or by an exception 
    jssc.awaitTermination(); 
} 

PrintRDDDetails的调用方法有如下:

public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
     throws Exception { 

    LOG.error("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 

} 

什么似乎发生的是,它只从一个分区获取数据。我已经在kafka中确认有5个分区。当执行调用方法时,它会打印正确数量的分区,但只打印1分区中的记录 - 并且我从该简化代码中取出的进一步处理 - 表明它只处理1个分区。

回答

4

这似乎与星火2.1.0的问题,因为它使用了卡夫卡的客户端库v0.10.1(按照以下拉入请求):

https://github.com/apache/spark/pull/16278

我工作围绕这个用较新版本的kafka客户端库:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming"    % sparkVersion, 
    "org.apache.spark" %% "spark-sql"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, 
    "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion, 
).map(_.exclude("org.apache.kafka", "kafka-clients")) 

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.0" 
+0

谢谢 - 我会尝试并让你知道。 – Erin

+0

帕拉姆 - 工作!非常感谢你。 – Erin

+0

太棒了!我很高兴:) – Param

相关问题