2016-12-15 70 views
2

我们需要在我们的Java EE应用程序中使用队列,并且由于它是云基础应用程序(部署在OpenShift Online上),我们喜欢使用亚马逊sqs。在@MessageDriven bean中使用amazon sqs - 池/并行处理

如果我正确理解了JMS/Java EE的接收部分的理论,一个@MessageDriven bean由Java EE容器管理,以便并行创建大量bean实例(根据最大池大小),如果传入消息的数量很高。这当然是处理高负载的一大好处。

但是,我不明白我们如何在Java EE应用程序中以这种方式集成aws sqs。我知道异步接收器的例子来自http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html

class MyListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     try { 
      // Cast the received message as TextMessage and print the text to screen. 
      if (message != null) { 
       System.out.println("Received: " + ((TextMessage) message).getText()); 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

然后:

// Create a consumer for the 'TestQueue'. 
MessageConsumer consumer = session.createConsumer(queue); 

// Instantiate and set the message listener for the consumer. 
consumer.setMessageListener(new MyListener()); 

// Start receiving incoming messages. 
connection.start(); 

这是官方异步接收器的例子 - 这不是一个@MessageDriven豆。很显然,我们需要在某个地方进行身份验证(通过创建一个SQSConnectionFactory,然后是一个连接,然后是一个会话 - 这在示例中也有详细描述)。
但我强烈认为这个例子不会并行处理消息 - 即只有一个bean实例正在处理队列,这对于可伸缩的高负载应用程序来说并不是一个好的解决方案。

a)我们如何才能通过Amazon SQS实现真正的Java EE方式? 我只是发现春天的例子。但它必须是Java EE 7. b)我们使用Wildfly(现在是8.2.1)。是否也可以让Wildfly在内部管理与AWS和应用程序的连接,我们可以像使用应用程序服务器管理的队列一样使用队列(与数据源访问数据库的方法相同)?

结论后,得到的回答从stdunbar
它似乎没有可能在一个“适当的方式”,是我喜欢做的事。所以我该怎么做?实施ManagedExecutorService作为stdunbar描述'包裹'的队列? - 但是这意味着有一个本地队列,这对于一个应用程序来说不是一个好的情况,应该是可扩展的! 什么是替代品?我们正在OpenShift Online上运行应用程序。使用例如自己的装备来实例化自己的装备可能是不利的。 ApacheMQ Cartridge ......当然还有很多不利因素,比如成本,我们对“基础架构”负责。

说实话,我在这种情况下,真的很失望AWS的...

回答

2

我不认为我的解决方案是正确的JAVA EE,但在我的情况下,它的工作原理。

配置:

@Singleton 
public class SqsMessageManager 
{ 
    private Integer numberOfReceivers = 3; 

    public static SQSConnection connection = null; 
    public static Queue queue = null; 

    @Inject 
    SqsMessageReceiver sqsMessageReceiver; 

    public void init() 
    { 
     try 
     { 
      SQSConnectionFactory connectionFactory = 
        SQSConnectionFactory.builder() 
          .withRegion(Region.getRegion(Regions.EU_WEST_1)) 
          .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider()) 
          .build(); 

      connection = connectionFactory.createConnection(); 

      queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue"); 

      for (int i = 0; i < numberOfReceivers; i++) 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver); 

      connection.start(); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

然后发件人:

@Dependent 
public class SqsMessageSender 
{ 
    MessageProducer producer = null; 
    Session senderSession = null; 

    @PostConstruct 
    public void createProducer(){ 
     try 
     { 
      // open new session and message producer 
      senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      producer = senderSession.createProducer(SqsMessageManager.queue); 
     }catch(JMSException | NullPointerException e){ 
      ; 
     } 
    } 

    @PreDestroy 
    public void destroy(){ 
     try 
     { 
      // close session 
      producer.close(); 
      senderSession.close(); 
     }catch(JMSException e){ 

     } 
    } 

    // sends a message to aws sqs queue 
    public void sendMessage(String txt) 
    { 
     try 
     { 
      TextMessage textMessage = senderSession.createTextMessage(txt); 
      producer.send(textMessage); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

和接收器:

@Dependent 
public class SqsMessageReceiver implements MessageListener 
{ 
    public void onMessage(Message inMessage) { 
     ... 
    } 
} 
+0

我不完全明白,你的** numberOfReceivers **是如何工作的。你在同一个对象上创建了多个监听器(你注入'sqsMessageReceiver',实际上是一个实例)? – badera

3

据一些老docs I found

容器允许消息驱动bean类的实例是并发运行,从而允许并发处理消息流。

通过使用亚马逊JMS的整合,再加上声明 MDB,你要善于去。我不会使用setMessageListener接口。您可以使用JMS的声明版本,因为您使用的是Wildfly 8。x/EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ }) 
public class MyListener implements MessageListener { 
    @Override 
    public void onMessage(Message message) { 
    } 
} 

这允许容器根据需要创建尽可能多的实例。请注意,可能需要在Wildfly中为JMS参数进行一些调整。

请注意,让Amazon库负责读取SQS队列。我已经开始翻阅自己的读者,认为我可以编写它。但是我发现,您不能使用带有从队列中读取的多个线程的AWS Java库,因为几乎每次都会有重复的读取。我有4个线程读取SQS队列,并会得到4条相同的消息。我终于变成了一个阅读器,将消息放入一个LinkedBlockingDeque中,以供其他一些线程使用。

我所展示的一切都是纯Java/EE。

编辑
与亚马逊SQS/JMS集成玩弄了一段,我觉得如果你使用它,你是在浪费你的时间。它仅适用于JMS 1.1,因此它也使用带有JNDI的旧JMS语法。此外,它只适用于队列,不适用于主题。

我强烈建议创建自己的实现。 ManagedExecutorService运行带有短SQS读取超时的队列读取器线程。每次循环都将从SQS队列中读取并将消息放入JMS队列或主题中。

对不起,你已经得到了这个希望 - 亚马逊刚刚没有被维持足够的价值。

+0

谢谢,stdunbar。我很高兴听到这应该起作用 - 但是,我不知道如何发挥所需的参数(凭据)。我在哪里放置AWS证书?我该如何实现你的注意事项'作为一个便笺,让亚马逊图书馆负责读取SQS队列? - 我认为用@ MessageDriven注释会使应用程序服务器读取队列?! - 我明白,如果您可以更具体地了解如何将AWS SQS集成到您的代码中。 – badera

+0

我怀疑你是对的。这些都不是好消息。如果我理解正确,那只是SDK“不是最新的”,而不是服务本身? - 我用一个结论和一个进一步的问题更新了这个问题。 – badera

+0

标有b)的问题仍未解答。希望能得到更多的想法,该怎么办,我开始赏金......至少感谢[+1]! – badera

0

Payara Cloud Connectors似乎是相当新的,但看起来很有希望。不知道这是否也适用于其他容器。据我了解,它基于JCA适配器。