0
我有以下卡夫卡消费者,我想在特定情况下暂停,然后稍后恢复使用以前的所有消息。一个想法是使用共享标志,可以由其他线程更新并在使用之前,即iterator.next().message()
我检查标志的值。如果真的不消耗其他消费的消息。只是想检查我是否在正确的方向思考,或者是否有更好的方法。暂停高级卡夫卡消费者
class KafkaConsumer implements Runnable {
KafkaStream<byte[], byte[]> topicStream;
ConsumerConnector consumerConnectorObj;
public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
final ConsumerConnector consumerConnectorObj) {
this.topicStream = topicStream;
this.consumerConnectorObj = consumerConnectorObj;
}
@Override
public void run() {
if (topicStream != null) {
ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
while (true) {
if (iterator != null) {
boolean nextFlag = true;
try {
iterator.hasNext();
} catch (ConsumerTimeoutException e) {
LOG.warn("Consumer timeout occured", e);
nextFlag = false;
}
if (nextFlag) {
byte[] msg = iterator.next().message();
}
}
}
}
}
}
谢谢哈拉德我会尝试它,并会回来 –