2016-05-06 156 views
0

我有以下卡夫卡消费者,我想在特定情况下暂停,然后稍后恢复使用以前的所有消息。一个想法是使用共享标志,可以由其他线程更新并在使用之前,即iterator.next().message()我检查标志的值。如果真的不消耗其他消费的消息。只是想检查我是否在正确的方向思考,或者是否有更好的方法。暂停高级卡夫卡消费者

class KafkaConsumer implements Runnable { 
     KafkaStream<byte[], byte[]> topicStream; 
     ConsumerConnector consumerConnectorObj; 

     public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream, 
       final ConsumerConnector consumerConnectorObj) { 
      this.topicStream = topicStream; 
      this.consumerConnectorObj = consumerConnectorObj; 
     } 

     @Override 
     public void run() { 
      if (topicStream != null) { 
       ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator(); 
       while (true) { 
         if (iterator != null) { 
          boolean nextFlag = true; 
          try { 
           iterator.hasNext(); 
          } catch (ConsumerTimeoutException e) { 
           LOG.warn("Consumer timeout occured", e); 
           nextFlag = false; 
          } 

          if (nextFlag) { 
           byte[] msg = iterator.next().message(); 
          } 
         } 
       } 
      } 
     } 
    } 

回答

0

通常情况下,您不应该需要通过手工方式在线程之间进行同步,因为您很有可能会误解它。帮助者请看java.util.concurrent。在你的情况下,它可能是一个信号量。使用它的最简单的方法是在处理消息之前获取信号量,然后将其返回,并在下一个循环中立即尝试再次获取信号量。

我的预感是,这不是最好的办法。我宁愿拨打availablePermits()并继续消费,而数字更大。只有当它下降到零时,才尝试获取信号量。这将阻止该线程,直到另一个线程再次提供一个许可。这将打开你的工作线程,并把它交给许可证,它应该马上让回来,开始像上面那样循环。

while (true) { 
    if (semaphore.availablePermits()<=0) { 
    semaphore.acquire(); // will block 
    // eventually another thread increments the semaphore again 
    // and we arrive here 
    semaphore.release(); 
    } 
    // keep processing messages 
} 

你可能会在this question的答案中找到补充意见。

+0

谢谢哈拉德我会尝试它,并会回来 –