2014-01-31 47 views
5

我很难搞清楚如何处理来自Amazon SQS的消息。在Java中异步处理来自队列的Amazon SQS消息

我想实现如下:

  1. 监听器SQS从队列
  2. 处理消息,并将其添加到DB
  3. 从队列

删除处理的消息让我困扰的一个很多是如何实施步骤2.我有类SQSConnectorProfileDao。现在我想要简单的实现,通过初始化SQSConnectorProfileDao并接收来自队列的消息。我的想法是开始新的线程,开始轮询消息,当队列为空时,从ProfileDao中断线程。

返回/处理消息(回调函数?)的最佳方式是什么?如果还有其他方法,我可以选择。

谢谢

回答

3

我使用Java的ExecutorServiceFutureConcurrentLinkedQueue完成与SQS类似的东西。

ExecutorService创建一个线程池,该线程池可以执行实现Callable接口并返回Future的类。当ExecutorService创建期货时,我将它们推送到在线程中运行的ConcurrentLinkedQueue,并在期货完成时处理结果。

实施检查SQS和异步开始工作:

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class SqsProcessor { 

    private static final int THREAD_COUNT = 100; 
    private ExecutorService _executor = null; 
    private FutureResultProcessor futureResultProcessor = null; 

    public SqsProcessor() { 
     _executor = Executors.newFixedThreadPool(THREAD_COUNT); 
     _futureResultProcessor = new FutureResultProcessor(); 
    } 

    public void waitReceive() { 

     // Receive a SQS message 

     // Start the work related to the SQS message 
     Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage); 
     Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker); 

     // Send to the queue so the result can be processed when it completes 
     _futureResultProcessor.add(sqsFuture); 
    } 
} 

的类,它的工作:

import java.util.concurrent.Callable; 

public class MyWorker implements Callable<MyWorkerResult> { 

    private String _sqsMessage = null; 

    public MyWorker(String sqsMessage) { 
     _sqsMessage = sqsMessage; 
    } 

    @Override 
    public MyWorkerResult call() throws Exception { 
     // Do work relating to the SQS message 
    } 
} 

举行的工作成果:

public class MyWorkerResult { 
    // Results set in MyWorker call() 
} 

的ConcurrentLinkedQueue接收并处理未来的结果:

import java.util.concurrent.Future; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class FutureResultProcessor extends Thread { 

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>(); 
    private final Integer CHECK_SLEEP = 300; 

    public FutureResultProcessor() { 
    } 

    public void run() { 
     while(true) { 
      Future<MyWorkerResult> myFuture = resultQueue.poll(); 

      if(myFuture == null) { 
       // There's nothing to process 
       try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {} 
       continue; 
      } 

      // Process result 
      if(myFuture != null) { 

       MyFutureResult myFutureResult = myFuture.get(); 

       // Process result 
      } 
     } 
    } 

    public void add(Future<MyWorkerResult> sqsFuture) { 
     resultQueue.offer(sqsFuture); 
    } 
} 

或者,您可以收集一组期货并等待它们全部完成,然后再处理结果。

Akka可能是个不错的选择。我没有直接使用它,但它提供了运行异步任务的框架,提供了错误处理,甚至可以将任务分配给远程实例。