2013-02-22 62 views
2

我正在使用rabbitmq的最新pika库(0.9.9+)。我对rabbitmq和pika的用法如下:Amazon ec2中pika-rabbitmq的心跳间隔很好

  1. 作为工人,我有很长时间的运行任务(约5分钟)。这些任务从rabbitmq收到请求。请求很少发生,即请求之间有很长的空闲时间。
  2. 我以前面临的问题与空闲连接(由于空闲连接引起的连接关闭)有关。所以,我已经启用了pika的心跳。
  3. 现在选择心跳是一个问题。 Pika似乎是一个单线程库,心跳接收和确认恰好在请求时间段之间完成。
  4. 因此,如果心跳间隔小于回调函数用于执行其长时间运算的时间,服务器将不会收到任何心跳确认并关闭连接。
  5. 因此,我假设最小心跳间隔应该是阻塞连接中回调函数的最大计算时间。

什么可以很好的心跳值Amazon EC2上,以防止它关闭空闲连接?

此外,一些建议使用rabbitmq keepalive(或libkeepalive)来维护tcp连接。我认为在tcp层管理心跳会好得多,因为应用程序不需要管理它们。这是真的吗?与RMQ心跳相比,Keepalive是一种好方法吗?

我已经看到一些建议使用多个线程和队列进行长时间运行的任务。但是这是长期运行任务的唯一选择吗?在这种情况下必须使用另一个队列是非常令人失望的。

预先感谢您。我想我已经详细解释了这个问题。让我知道我是否可以提供更多细节。

回答

3

如果你不依赖于使用鼠兔,这thread帮助我达到你想要做的海带用什么:

#!/usr/bin/env python 
import time, logging, weakref, eventlet 
from kombu import Connection, Exchange, Queue 
from kombu.utils.debug import setup_logging 
from kombu.common import eventloop 
from eventlet import spawn_after 

eventlet.monkey_patch() 

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' 
       '-35s %(lineno) -5d: %(message)s') 
logging.basicConfig(level=logging.INFO, format=log_format) 
logger = logging.getLogger('job_worker') 
logger.setLevel(logging.INFO) 


def long_running_function(body): 
    time.sleep(300) 

def job_worker(body, message): 
    long_running_function(body) 
    message.ack() 

def monitor_heartbeats(connection, rate=2): 
    """Function to send heartbeat checks to RabbitMQ. This keeps the 
     connection alive over long-running processes.""" 
    if not connection.heartbeat: 
     logger.info("No heartbeat set for connection: %s" % connection.heartbeat) 
     return 
    interval = connection.heartbeat 
    cref = weakref.ref(connection) 
    logger.info("Starting heartbeat monitor.") 

    def heartbeat_check(): 
     conn = cref() 
     if conn is not None and conn.connected: 
      conn.heartbeat_check(rate=rate) 
      logger.info("Ran heartbeat check.") 
      spawn_after(interval, heartbeat_check) 
    return spawn_after(interval, heartbeat_check) 

def main(): 
    setup_logging(loglevel='INFO') 

    # process for heartbeat monitor 
    p = None 

    try: 
     with Connection('amqp://guest:[email protected]:5672//', heartbeat=300) as conn: 
      conn.ensure_connection() 
      monitor_heartbeats(conn) 
      queue = Queue('job_queue', 
          Exchange('job_queue', type='direct'), 
          routing_key='job_queue') 
      logger.info("Starting worker.") 
      with conn.Consumer(queue, callbacks=[job_worker]) as consumer: 
       consumer.qos(prefetch_count=1) 
       for _ in eventloop(conn, timeout=1, ignore_timeouts=True): 
        pass 
    except KeyboardInterrupt: 
     logger.info("Worker was shut down.") 

if __name__ == "__main__": 
    main() 

我剥夺了我的域特定的代码,但本质上,这是我的框架使用。