2017-04-16 116 views
1

当我运行Kafka Streams应用程序的多个实例时,只有第一个实例正在正确接收消息。但是,如果我启动新的实例,它们不会收到任何消息。Kafka Streaming不能用于多个实例

有没有解决这个问题的建议?

这里是我的卡夫卡流媒体应用

package test.kafkastream; 

import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.processor.TopologyBuilder; 

public class Main { 

    public static void main(String[] args) { 
     Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); 
     //props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-wordcount-processor"); 

     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571"); 
     //props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
     //props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); 


     // setting offset reset to earliest so that we can re-run the demo code 
     // with the same pre-loaded data 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

     TopologyBuilder builder = new TopologyBuilder(); 

     builder.addSource("Source", "topic6"); 

     builder.addProcessor("Process", new ProcessMessage(), "Source"); 

     KafkaStreams streams = new KafkaStreams(builder, props); 
     streams.start(); 
    } 

} 

这里是我的制片人

package test.kafkamesos; 

import java.util.Date; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ExecutionException; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.ByteArraySerializer; 

public class Producer { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     Map<String, Object> producerConfig = new HashMap<String, Object>(); 
     producerConfig.put("bootstrap.servers", "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571"); 
     //producerConfig.put("bootstrap.servers", "localhost:9092"); 

     // optional: 
     producerConfig.put("metadata.fetch.timeout.ms", "3000"); 
     producerConfig.put("request.timeout.ms", "3000"); 
     // ... other options: 
     // http://kafka.apache.org/documentation.html#producerconfigs 
     ByteArraySerializer serializer = new ByteArraySerializer(); 
     KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(producerConfig, serializer, 
       serializer); 

     int i = 0; 
     while (true) { 
      String message = "{data:success,g:" + i + "}"; 
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic6", message.getBytes()); 
      kafkaProducer.send(record).get(); 
      System.out.println("sending " + message); 
      Thread.sleep(1000); 
      i++; 
     } 
    } 
} 

和我Dockerfile

FROM openjdk:8-jre 
COPY ./target/*-with-dependencies.jar /jars/service-jar.jar 
CMD java -cp /jars/service-jar.jar test.kafkastream.Main 
+0

我只能假设这是因为您已经添加链接的代码,而不是代码本身。尽管如此,如果这是原因,应该说明。 –

+0

@AleksandarStojadinovic谢谢。我现在将添加代码.... –

回答

3

我相信你有这个问题,因为卡夫卡经纪人仅为您正在使用的主题配置了一个分区(topic6)。从集博客:

例如,如果你的应用程序有10个 分区的单一主题读取,那么你就可以运行到您的应用程序 的10个实例(请注意,您可以运行进一步实例,但这些将空闲)。在 摘要中,主题分区的数量是Streams API应用程序的 并行性的上限,因此也是应用程序的运行实例的数量 的上限。

来源:https://www.confluent.io/blog/elastic-scaling-in-kafka-streams/

+0

是的,这是问题所在。谢谢。 –