1

我需要在成功处理并存储后,才能在兔子听众中手动确认多条消息。使用的弹簧启动配置如下SpringBoot:手动确认多个AMQP消息

listener: 
    concurrency: 2 
    max-concurrency: 20 
    acknowledge-mode: manual 
    prefetch: 30 

消息应该一次存储20个批次。只有当它们被成功存储时,才应该发送多重确认。存储机制也存在关联超时,即使没有20个消息,也应在20秒后存储消息。目前,我有以下的代码

@Slf4j 
@Component  
class EventListener { 

    @Autowired 
    private EventsStorage eventsStorage 
    private ConcurrentMap<Integer, ChannelData> channelEvents = new ConcurrentHashMap<>() 

    @RabbitListener(queues = 'event-queue') 
    void processEvent(@Payload Event event, Channel channel, @Header(DELIVERY_TAG) long tag) { 
    log.debug("Event received for channel $channel.channelNumber") 
    channelEvents.compute(channel.channelNumber, { k, channelData -> addEventAndStoreIfNeeded(channel, event, tag, channelData) }) 
    } 

    private ChannelData addEventAndStoreIfNeeded(Channel channel, Event event, long tag, ChannelData channelData) { 
    if (channelData) { 
     channelData.addEvent(tag, event) 
     if (channelData.getDeliveredEvents().size() >= batchSize) { 
     storeAndAckChannelEvents(channel.channelNumber) 
     } 
     return channelData 
    } else { 
     ChannelData newChannelData = new ChannelData(channel) 
     newChannelData.addEvent(tag, event) 
     return newChannelData 
    } 
    } 

    void storeAndAckChannelEvents(Integer channelNumber) { 
    channelEvents.compute(channelNumber, { k, channelData -> 
     List<DeliveredEvent> deliveredEvents = channelData.deliveredEvents 
     if (!deliveredEvents.isEmpty()) { 
     def events = deliveredEvents.stream() 
      .map({ DeliveredEvent deliveredEvent -> deliveredEvent.event }) 
      .collect(Collectors.toList()) 

     eventsStorage.store(events) 
     long lastDeliveryTag = deliveredEvents.get(deliveredEvents.size() - 1).deliveryTag 
     channelData.channel.basicAck(lastDeliveryTag, true) 
     deliveredEvents.clear() 
     } 
    }) 
    } 

    @Scheduled(fixedRate = 20000L) 
    void storeMessagingEvents() { 
    channelEvents.forEach({ k, channelData -> storeAndAckChannelEvents(channelData) }) 
    } 

} 

其中ChannelDataDeliveredEvent表现如下

class DeliveredMesssagingEvent { 
    int deliveryTag 
    Event event 
} 

class ChannelData { 
    Channel channel 
    List<DeliveredEvent> deliveredEvents = new ArrayList<>() 

    ChannelData(Channel channel) { 
    this.channel = channel 
    } 

    void addEvent(long tag, Event event) { 
    deliveredEvents.add(new DeliveredEvent(deliveryTag: tag, event: event)) 
    } 
} 

使用的Channelcom.rabbitmq.client.Channel。关于此接口状态的The docs

通道实例不能在线程之间共享。应用程序应该更喜欢使用每个线程的通道,而不是在多个线程之间共享同一个通道

所以,我做的非常相反,在SchedulerSimpleMessageListenerContainer工作线程之间共享Channel。我的应用程序的输出是这样的:

[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2 
[SimpleAsyncTaskExecutor-4] DEBUG EventListener - Event received for channel 3 
[SimpleAsyncTaskExecutor-5] DEBUG EventListener - Event received for channel 1 
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5 
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4 
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2 
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5 
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4 
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2 
[pool-4-thread-1] DEBUG EventListener - Storing channel 5 events 
[pool-4-thread-1] DEBUG EventListener - Storing channel 2 events 
... 

SimpleMessageListenerContainer工作者线程有自己的通道,它不随时间而改变。

考虑到我同步SchedulerSimpleMessageListenerContainer工作线程,有没有人看到任何理由为什么这个代码不是线程安全的?

是否有任何其他方法,我应该尝试在Spring启动时手动确认多条消息?

回答

1

只要您同步这些线程,您就可以正常工作。请注意,如果连接丢失,您将获得新的使用者,并且同步线程将显示过时的数据(未重新发送的消息将被重新发送)。

但是,您也可以使用container idle events

当消费者线程在那段时间空闲时,该事件将在同一个侦听器线程上发布,因此您可以在那里进行定时确认,您不必担心同步。

如果连接丢失,您还可以检测到consumer failed events