apache-kafka-streams

    2热度

    1回答

    我正在建立KTable的输入主题,我加入了两个Kafka流应用程序实例的KStream。 KTable的输入主题已经是日志压缩主题。因此,当我的应用程序实例中的一个出现故障时,另一个实例状态存储库似乎通过从输入日志压缩主题中读取而刷新整个状态。 因此,不需要为我的KTable商店启用日志记录(更改日志)? 我的源输入日志压缩的主题可能有数百万条记录,所以如果我在该KTable状态存储上启用日志记录

    1热度

    1回答

    我试图创建卡夫卡流的leftJoin的正常工作约10条记录,然后将其与引起的NullPointerException这样的代码的异常崩溃: private static KafkaStreams getKafkaStreams() { StreamsConfig streamsConfig = new StreamsConfig(getProperties()); KStrea

    0热度

    2回答

    我试图过滤出在给定(跳跃)时间窗口中长度为T的密钥出现频率高于阈值N的任何消息。 例如,下面的流中: #time, key 0, A 1, B 2, A 3, C 4, D 5, A 6, B 7, C 8, C 9, D 10, A 11, D 12, D 13, D 14, D 15, D 和N=2和T=3,结果应该是 0, A 2, A 7, C 8

    0热度

    1回答

    我有一个非常简单的应用程序KafkaStreams。它看起来像这样: input topic --> extract smth., update aggregate in the local state -> output topic 在开始的时候输入的话题只有1分,一切工作顺利。 但经过我在输入题目增加分区的数量我观察到的,而不是单一的更新我的应用程序的每个分区实例化,所以我的输出主题有多个

    0热度

    1回答

    有一种方法可以通过Kafka流对PIVOT/UNPIVOT(爆炸,转移)流吗? 如果我有 machineId ts VarName VarValue m1 2017-10-01 00:00:00 var1 1.0 m1 2017-10-01 00:00:00 var2 2.0 m2 2017-10-01 00:00:00 var1 3.0 m2 2017-10-01 00:00:00 va

    1热度

    1回答

    在使用持久状态存储托管KafkaStream(0.10.2.1)实例的多个节点的部署中,推荐重新启动所有方法同时避免重播整个状态存储变更日志主题?这必须在不改变application.id的情况下完成,因为我不想丢失我已经在州商店中拥有的数据。 我增加了session.timeout.ms,这样当代理开始重新分配分区时,所有节点都会启动,并避免调用KafkaStreams.stop来防止不必要的分

    0热度

    1回答

    我已经定义以下在卡夫卡拓扑流 Operation 1 : input_stream ----> filter ----> window_processing ----> write_to_topic Operation 2 : input_stream ----> write_to_topic 我观察到,这两个操作正在由同一线程(即使我增加线程StreamsConfig.NUM_STREAM

    0热度

    1回答

    我需要知道如何使用“为”我的卡夫卡KStreams线环......下面是我的“for”循环需要被列入KStreams for (int i = 0; i < 6 ; i++) { try { textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); Thread.s

    2热度

    2回答

    Kafka Streams引擎将一个分区映射到一个工作人员(即Java App),以便该分区中的所有消息都由该工作人员处理。我有以下情况,并试图了解它是否仍然可行。 我有一个主题A(有3个分区)。发送给它的消息由Kafka随机分区(即没有密钥)。我发送给它的消息有像下面 {carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}

    0热度

    1回答

    我有一个应用程序需要监听多个不同的主题;每个主题都有独立的消息处理逻辑。我曾经想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到如下所示的错误。 错误 java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscr