2012-04-16 63 views
9

我正在使用boto来处理通过在SQS中传递消息来管理其工作流的应用程序。如何使用Python中的boto库获取Amazon SQS队列中的所有消息?

我的SQS队列正在逐步增长,我也没有办法检查它有多少元素应该包含。

现在我有一个后台程序,定期轮询队列,并检查是否我有一个固定大小的组的元素。例如,考虑下面的“队列”:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

现在我要检查,如果我有“msg1_comp1”,一起在某个时间点在队列“msg2_comp1”和“msg3_comp1”,但我不”不知道队列的大小。

通过API看后,似乎你可以得到的只有1元,或在队列中的元素一个固定的数字,但不是全部:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

的答案中提出的建议将是例如,在循环中获得10条消息,直到我没有收回任何东西,但SQS中的消息具有可见性超时,这意味着如果我从队列中轮询元素,它们不会被真正删除,它们只会在短时间内不可见的时间。

有一个简单的方法来获得队列中的所有邮件,不知道有多少?

回答

13

把你的电话给q.get_messages(n)内while循环:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

此外,dump won't support more than 10 messages之一:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

我真的不能做到这一点,因为在SQS消息有可见性超时,所以如果我首先得到10条消息,然后循环几次,下一次我可能会获得相同的10条消息,因为超时已经过去了。我正在考虑使用'dump()',但我必须在阅读文件之后,这似乎很愚蠢,我错过了什么吗? (我可以设置visibility_timeout很长一段时间,但看起来很丑陋)。 – 2012-04-16 20:09:36

+0

@linker - 你说你需要检查'n'特定的消息。这是否意味着你有一些比较每条消息的匹配标准? – 2012-04-16 20:12:06

+0

对不起,如果这是令人困惑,我已经更新了我的帖子。 – 2012-04-16 20:17:26

5

我的理解是,在SQS服务的分布式特性几乎使你的设计不可行。每次调用get_messages时,都会与另一组服务器通信,这些服务器会包含一些但不是全部的消息。因此,如果一组特定的消息已准备就绪,则不可能“不时检查”,然后仅接受这些消息。

你需要做的是调查不断,把所有的邮件到达时,并在您自己的数据结构存储在本地。每次成功获取后,您都可以检查数据结构以查看是否收集了一整套完整的消息。

记住消息到达的顺序,并且一些消息传递两次,因为删除必须传播到所有的服务器SQS,但随后的get请求有时拍出来的删除邮件。

0

像下面的代码应该做的伎俩。对不起,它在C#中,但它不应该很难转换为Python。该字典用于清除重复项。

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

我一直在使用AWS SQS队列来提供即时通知,所以我需要实时处理所有消息。以下代码将帮助您有效地将所有消息出列并在处理时处理任何错误。

注意:要从队列中删除消息,您需要删除它们。我使用的是更新的boto3 AWS蟒蛇SDK,JSON库,和下面的默认值:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

适用于v2'Boto'软件包以将'delete_messages'函数从'Boto3'“backport”为[here](http://stackoverflow.com/a/40638174/4228193)。内置的'Boto'(2)'delete_message_batch'具有10条消息的限制,并且需要完整的'Message'类对象,而不仅仅是对象中的'ID'和'ReceiptHandles'。 – mpag 2016-11-18 17:55:17

0

注意:这是不打算作为一个直接的问题的答案。 相反,它是对@TimothyLiu's answer的扩充,假设最终用户使用的是Boto包(又名Boto2)而不是Boto3。该代码是一个在his answer


“宝途-2-化”的 delete_messages呼叫称为的 Boto(2)调用用于 delete_message_batch(messages_to_delete)其中 messages_to_deletedict对象与关键:值对应于 idreceipt_handle双返回

AttributeError: 'dict' object has no attribute 'id'.

看来delete_message_batch需要一个Message类对象;如果您一次删除10个以上的“消息”,则复制Boto source for delete_message_batch并允许其使用非Message对象(ala boto3)也会失败。所以,我不得不使用以下解决方法。从here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

ePrint的代码,我在一个cronjob执行这个

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
相关问题