2015-07-19 71 views
1

403S我需要保证消费者的排他性消费者线程在不同的运行时间从固定数量的队列(如队列的数量比消费者更大)消耗的可变数目。作用在弹簧AMQP

我总的想法是,我必须每个消费者线程尝试建立了独家连接到清除队列,并且,如果它去一定时期内没有从队列接收消息,将其重定向到另一个队列。

即使队列被暂时清除,它容易在以后再次收到消息,以便队列不能简单地被遗忘 - 相反,消费者应稍后返回。为了实现这种轮换,我想我会使用队列队列。当消费者失败时,危险就会失去对队列队列中的队列的引用;我认为这似乎可以通过承认来解决,如下所示。

本质上,每个消费者线程等待与对队列的引用以获得消息(A)(1)从队列的队列;消息(A)最初未被确认。消费者愉快地尝试清除队列(1),并且一旦队列(1)在给定时间内保持为空,则消费者从队列队列中请求新的队列名称。一旦接收到第二消息(B)和对新队列(2)的引用,对队列(1)的引用作为新消息(C)放回到队列队列的末尾,最后消息(A)被承认。

事实上,队列的交付至少和可能只有一次保证几乎让我对这里的正常队列(1,2)有排他性,但为了确保我绝对不要失去对队列的引用,我需要重新发布队列(1)作为消息(C)之前我确认消息(A)。这意味着,如果服务器在将队列(1)重新发布为消息(C)之后但在确认(A)之前发生故障,则队列队列中可能存在对队列(1)的两个引用,并且不再保证排他性。

因此,我需要使用AMQP的独占消费者标志,这很好,但就目前而言,如果我收到“403 ACCESS REFUSED”,我还不想重新发布对队列的引用,所以重复的参考文​​件不会激增。

不过,我使用Spring的优秀AMQP库,我不知道怎样才能用一个错误处理程序挂钩英寸暴露在容器上的setErrorHandler方法似乎不适用于“403 ACCESS REFUSED”错误。

有没有一种方法可以使用我目前使用的框架来执行403s?或者,还有另一种方法可以实现我需要的保证吗?我的代码如下。

的“监控服务”:

import org.joda.time.DateTime; 
import org.joda.time.DateTimeZone; 
import org.joda.time.Period; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.AmqpAuthenticationException; 
import org.springframework.amqp.core.MessageListener; 
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; 

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collection; 
import java.util.Optional; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 
import java.util.function.Supplier; 

public class ListenerMonitoringService { 

    private static final Logger log = LoggerFactory.getLogger(ListenerMonitoringService.class); 

    private static final Period EXPIRATION_PERIOD = Period.millis(5000); 

    private static final long MONTIORING_POLL_INTERVAL = 5000; 
    private static final long MONITORING_INITIAL_DELAY = 5000; 

    private final Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier; 

    private final QueueCoordinator queueCoordinator; 
    private final ScheduledExecutorService executorService; 

    private final Collection<Record> records; 

    public ListenerMonitoringService(Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier, 
            QueueCoordinator queueCoordinator, ScheduledExecutorService executorService) { 
     this.messageListenerContainerSupplier = messageListenerContainerSupplier; 
     this.queueCoordinator = queueCoordinator; 
     this.executorService = executorService; 

     records = new ArrayList<>(); 
    } 

    public void registerAndStart(MessageListener messageListener) { 
     Record record = new Record(messageListenerContainerSupplier.get()); 

     // wrap with listener that updates record 
     record.container.setMessageListener((MessageListener) (m -> { 
      log.trace("{} consumed a message from {}", record.container, Arrays.toString(record.container.getQueueNames())); 
      record.freshen(DateTime.now(DateTimeZone.UTC)); 
      messageListener.onMessage(m); 
     })); 

     record.container.setErrorHandler(e -> { 
      log.error("{} received an {}", record.container, e); 
      // this doesn't get called for 403s 
     }); 

     // initial start up 
     executorService.execute(() -> { 
      String queueName = queueCoordinator.getQueueName(); 

      log.debug("Received queue name {}", queueName); 
      record.container.setQueueNames(queueName); 

      log.debug("Starting container {}", record.container); 
      record.container.start(); 

      // background monitoring thread 
      executorService.scheduleAtFixedRate(() -> { 
       log.debug("Checking container {}", record.container); 
       if (record.isStale(DateTime.now(DateTimeZone.UTC))) { 
        String newQueue = queueCoordinator.getQueueName(); 
        String oldQueue = record.container.getQueueNames()[0]; 
        log.debug("Switching queues for {} from {} to {}", record.container, oldQueue, newQueue); 
        record.container.setQueueNames(newQueue); 

        queueCoordinator.markSuccessful(queueName); 
       } 
      }, MONITORING_INITIAL_DELAY, MONTIORING_POLL_INTERVAL, TimeUnit.MILLISECONDS); 
     }); 

     records.add(record); 
    } 

    private static class Record { 
     private static final DateTime DATE_TIME_MIN = new DateTime(0); 

     private final AbstractMessageListenerContainer container; 
     private Optional<DateTime> lastListened; 

     private Record(AbstractMessageListenerContainer container) { 
      this.container = container; 
      lastListened = Optional.empty(); 
     } 

     public synchronized boolean isStale(DateTime now) { 
      log.trace("Comparing now {} to {} for {}", now, lastListened, container); 
      return lastListened.orElse(DATE_TIME_MIN).plus(EXPIRATION_PERIOD).isBefore(now); 
     } 

     public synchronized void freshen(DateTime now) { 
      log.trace("Updating last listened to {} for {}", now, container); 
      lastListened = Optional.of(now); 
     } 
    } 
} 

的“队列的队列”处理程序:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Envelope; 
import com.rabbitmq.client.GetResponse; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.connection.Connection; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 

import java.io.IOException; 
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 

private class MetaQueueCoordinator implements QueueCoordinator { 

    private static final Logger log = LoggerFactory.getLogger(MetaQueueCoordinator.class); 

    private final Channel channel; 
    private final Map<String, Envelope> envelopeMap; 
    private final RabbitTemplate rabbitTemplate; 

    public MetaQueueCoordinator(ConnectionFactory connectionFactory) { 
     Connection connection = connectionFactory.createConnection(); 
     channel = connection.createChannel(false); 

     envelopeMap = new ConcurrentHashMap<>(); 
     rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setExchange(""); 
     rabbitTemplate.setRoutingKey("queue_of_queues"); 
    } 

    @Override 
    public String getQueueName() { 
     GetResponse response; 
     try { 
      response = channel.basicGet("queue_of_queues", false); 
     } catch (IOException e) { 
      log.error("Unable to get from channel"); 
      throw new RuntimeException(e); 
     } 

     String queueName = new String(response.getBody()); 
     envelopeMap.put(queueName, response.getEnvelope()); 

     return queueName; 
    } 

    @Override 
    public void markSuccessful(String queueName) { 
     Envelope envelope = envelopeMap.remove(queueName); 
     if (envelope == null) { 
      return; 
     } 

     log.debug("Putting {} at the end of the line...", queueName); 
     rabbitTemplate.convertAndSend(queueName); 

     try { 
      channel.basicAck(envelope.getDeliveryTag(), false); 
     } catch (IOException e) { 
      log.error("Unable to acknowledge {}", queueName); 
     } 
    } 

    @Override 
    public void markUnsuccessful(String queueName) { 
     Envelope envelope = envelopeMap.remove(queueName); 
     if (envelope == null) { 
      return; 
     } 

     try { 
      channel.basicAck(envelope.getDeliveryTag(), false); 
     } catch (IOException e) { 
      log.error("Unable to acknowledge {}", queueName); 
     } 
    } 
} 

回答

1

ErrorHandler是消息传递过程中的操作错误,而不是建立监听器本身。

即将到来的1.5版本publishes application events时例外,如出现这种情况。

它将在今年夏天晚些时候发布;此功能目前仅在1.5.0.BUILD-SNAPSHOT;在接下来的几周内应该有一个候选版本。

project page显示了如何从快照回购中获取快照。

+0

谢谢。有没有机会使这些例外情况对客户端代码稍微友好一些,例如响应代码枚举,还是我试图将一个方形的挂钩插入圆孔? – jwilner

+0

也就是说,我只是想做一些你们不打算支持的东西? – jwilner

+0

对不起,刚刚在'ShutdownSignalException'上找到'getReason'方法,虽然我猜我的大问题仍然存在 - 这听起来有点不对劲吗? – jwilner