2016-11-18 77 views
11

我正在使用Celery和RabbitMQ来处理来自API请求的数据。该过程如下:芹菜工作者内的多线程

请求 - > API - >的RabbitMQ - >芹菜工人 - >返回

理想我会催生更多的芹菜工人,但我存储器约束的限制。

目前,我的流程中的瓶颈是从传递给worker的URL中获取和下载数据。罗非鱼,过程是这样的:

celery_gets_job(url): 
    var data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck) 
    var result = processes_data(data) # takes 0.1ss 
    return result 

当工人被锁定了一段时间,而抓取网址时这是不可接受的。我期待在通过穿线改善这一点,但我不能确定什么是最好的做法是:

  • 有没有一种方法,使芹菜工人下载传入的数据不同步,而在在同一时间处理数据不同的线程?

  • 我是否应该通过RabbitMQ让单独的工人抓取和处理某种形式的消息传递?

+1

通过创建两个多进程,您可以考虑在celery任务中使用[多处理管道](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe)。当然你的多处理过程应该受到池的限制。通过rabbitmq/result后端共享获取的url的大数据并不是好主意,如果我没有错。芹菜低级别的API也可以有一些类似的功能。 –

+1

我不知道RabbitMQ,但我认为多处理会比多线程更适合你,因为'celery_gets_job'有多个非原子操作,这会在使用多线程时产生问题。您可以使用Queue,其中数据由运行'fetches_url(url)'的进程池填充,另一个进程(es)执行'processes_data(data)' – shrishinde

+0

这可能是您正在寻找的内容:http:// stackoverflow。 com/questions/28315657/celery-eventlet-non-blocking-requests – fpbhb

回答

1

使用eventlet库,可以修补标准库以使它们异步。

首次进口异步的urllib2:

from eventlet.green import urllib2 

所以,你将获得url体:

def fetch(url): 
    body = urllib2.urlopen(url).read() 
    return body 

查看更多eventlet例子here

+2

另外,直接使用eventlet执行池http://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html应该会自动为monkey patch io调用。 – dyeray

+0

但是,那么'process_data(data)'仍然会阻塞并且使得组合结果比以前更慢? – ostrokach

0

我会创建两个任务,一个用于下载数据,另一个用于处理下载的数据。这样你可以独立地扩展两个任务。参见:RoutingChains