我对卡夫卡和卡夫卡流很新,所以请忍受我。我想知道我是否在这条正确的轨道上。卡夫卡流应用程序从写入分离读取
我正在写卡夫卡话题,并尝试通过休息服务访问数据。原始数据类型在访问之前需要进行转换。
我到目前为止是一个将原始数据写入主题的生产者。
1.)现在我想要流应用程序(应该是一个容器中运行的jar),只需要转换我所需的形状的数据。遵循这里的物化视图范例。
超过1.简化版本)
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source =
builder.stream("my-raw-data-topic");
KafkaStreams streams = new KafkaStreams(builder,props);
KTable<String, Long> t = source.groupByKey().count("My-Table");
streams.start();
2.)而另一流应用(应该是在容器中运行的罐),该甫一保持KTable
为某种数据存储可以是通过包装休息服务访问。
在这里,我有点困惑与正确的方式来使用api。 什么是bareunun访问和查询KTable
?我是否需要再次将转换拓扑分配给构建器?
KStreamBuilder builder = new KStreamBuilder();
KTable table = builder.table("My-Table"); //Casting?
KafkaStreams streams = new KafkaStreams(builder, props);
RestService service = new RestService(table);
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly
眼下这是伪代码
我在这里在正确的道路上?是否有意义分开1)和2)?这是缩进的方式来处理流来实现视图吗?对我而言,在我看到更多流量的情况下,它将有利于通过容器独立扩展写入和读取。
如何是KTable的重新填充上的任一1)或2)碰撞来处理。这是通过复制流api完成的,还是我需要通过代码解决的问题。就像重置光标并回复事件一样?
THX为回复。 StateStore和KTable有什么不同?因为我正在写信给一张桌子,而“休息服务”没有通过你的第三段找到KTable。我正在使用streams.store访问它(“Table-Name”,..)是否必须是StateStore? – silverfighter
对于这种情况,这是一回事。当你拿到KTable时,你不会给KTable命名,但它是内部商店。 (注意,count方法的参数名称是'storeName')。所以不知道为什么没有找到商店。可能是你RestService实现中的一个错误。 –
thx,可能是我得到的情况“org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储,countByApi,可能已迁移到另一个实例。”在docker上的Single实例env中 - 并且该表通过命令行作为主题可见。 – silverfighter