1
使用Kafka流(版本0.10.0.1)和Kafka代理(0.10.0.1)我试图根据消息密钥生成计数。我生产我的邮件使用下面的命令:卡夫卡流不会在countByKey后编写预期结果
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streams-topic --property parse.key=true --property key.separator=,
当我运行上面的命令我可以给这样的键和值:
1,{"value":10}
这将消息发送到卡夫卡具有关键= 1并且值= {“值”:10}。
我的目标是然后计算有多少消息的密钥= 1。鉴于上面的命令则将被计为1
这里是我使用的代码:
public class StreamProcessor {
public static void main(String[] args) {
KStreamBuilder builder = new KStreamBuilder();
final Serde<Long> longSerde = Serdes.Long();
final Serde<String> stringSerde = Serdes.String();
KStream<String, String> values = builder.stream(stringSerde, stringSerde, "kafka-streams-topic");
KStream<String, Long> counts = values
.countByKey(stringSerde, "valueCounts")
.toStream();
counts.print(stringSerde, longSerde);
counts.to(stringSerde, longSerde, "message-counts-topic");
KafkaStreams streams = new KafkaStreams(builder, properties());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static Properties properties() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-poc");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return streamsConfiguration;
}
}
当我运行counts.print(stringSerde,longSerde)我得到:
1 , 1
这意味着我有一个密钥= 1,他们是1个有该密钥的消息。这是我的期望。
然而,当以下行运行:
counts.to(stringSerde, longSerde, "message-counts-topic");
称为消息计数话题的话题被发送给它的消息,但是当我尝试使用此命令来读取消息:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key=true --property key.separator=, --from-beginning
我得到以下输出:
1 ,
凡1是关键,没有什么是显示该值。我期望看到消息1,1。但由于某种原因,即使计数值在调用打印方法时显示,计数值也会丢失。
下面是完整的命令: /bin/kafka-console-consumer.sh --zookeeper本地主机:2181 --topic消息计数话题--property print.key =真--property key.separator = ,--property value.deserializer = org.apache.kafka.common.serialization.LongDeserializer --from-beginning – crypto