2016-07-28 77 views
1

我想只允许在给定的时间正在处理的sqs队列中的一个项目。目前它只会提取一个队列中的单个消息,但它会继续在每次调查时都会这样做。Aws整合弹簧:保证只有一个项目从平方

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(2); 
     executor.setQueueCapacity(10); 
     executor.setThreadNamePrefix("test-"); 
     executor.initialize(); 
     return executor; 

     new SqsMessageDrivenChannelAdapter(amazon)); 
     adapter.setMaxNumberOfMessages(1); 
     adapter.setSendTimeout(2000); 
     adapter.setVisibilityTimeout(1200); 
     adapter.setWaitTimeOut(20); 
     adapter.setTaskExecutor(this.asyncTaskExecutor()); 

这个问题似乎是在ThreadPoolTask​​Executor和我的理解。由于队列大小为10,它每次都会提升,直到满满为止?

设置maxPoolSize到1导致的

Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.[email protected]406354e5 rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) 
    ... 6 common frames omitted 

回答

1

的问题是,你的ThreadPoolExecutor被设定为具有使用2个线程消耗掉这个队列中的消息10的BlockingQueue大小。所以在任何时候,你都可以有两个线程同时处理消息。如果将PoolSize设置为1,则可以保证在给定时间只能处理一条消息。

从源代码:

/* 
* Proceed in 3 steps: 
* 
* 1. If fewer than corePoolSize threads are running, try to 
* start a new thread with the given command as its first 
* task. The call to addWorker atomically checks runState and 
* workerCount, and so prevents false alarms that would add 
* threads when it shouldn't, by returning false. 
* 
* 2. If a task can be successfully queued, then we still need 
* to double-check whether we should have added a thread 
* (because existing ones died since last checking) or that 
* the pool shut down since entry into this method. So we 
* recheck state and if necessary roll back the enqueuing if 
* stopped, or start a new thread if there are none. 
* 
* 3. If we cannot queue task, then we try to add a new 
* thread. If it fails, we know we are shut down or saturated 
* and so reject the task. 

你打第三种情况。

+0

这是我的第一个想法,这不起作用见上文。 @Gandalf – user101010101

+0

可能需要更多的代码来解决这个问题。 GitHub的? – Gandalf