2011-05-19 65 views
9

我对GAE中mapreduce支持的当前状态有些困惑。根据文档http://code.google.com/p/appengine-mapreduce/还没有支持reduce阶段,但在I/O 2011(http://www.youtube.com/watch?v=EIxelKcyCC0)的会话描述中写道:“现在可以在App Engine上运行完整的Map Reduce作业”。我不知道如果我能在这个任务中使用的MapReduce:在Google App Engine中使用mapreduce的简单计数器示例

我想要做什么:

我有外地的颜色模型车:

class Car(db.Model): 
    color = db.StringProperty() 

我想运行MapReduce的过程(从时间,cron定义),它可以计算每种颜色中有多少辆车,并将此结果存储在数据存储区中。看起来像一个非常适合mapreduce的工作(但如果我错了我的话),阶段“map”将为每个Car实体生成对(,1),阶段“reduce”应该通过color_name合并这个数据,给我预期的结果。最后的结果我想是与存储在数据存储计算数据,这样的事情实体:

class CarsByColor(db.Model): 
    color_name = db.StringProperty() 
    cars_num = db.IntegerProperty() 

问题: 我不知道如何在AppEngine上实现这个...录像显示例子使用定义的映射和减少函数,但它们似乎是与数据存储无关的非常普遍的示例。我发现的所有其他示例都使用一个函数处理来自DatastoreInputReader的数据,但它们似乎只是“映射”阶段,没有示例说明如何执行“reduce”(以及如何将减少的结果存储在数据存储)。

回答

6

我在这里提供的解决方案我最终使用GAE中的mapreduce(没有缩小阶段)。如果我从头开始,我可能会使用由Drew Sears提供的解决方案。

它工作在GAE的Python 1.5.0

应用。YAML我添加了MapReduce的处理程序:

- url: /mapreduce(/.*)? 
    script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py 

和我的mapreduce的代码处理程序(我使用的URL/mapred_update收集由MapReduce的产生的结果):

- url: /mapred_.* 
    script: mapred.py 

创建mapreduce.yaml用于加工车实体:

mapreduce: 
- name: Color_Counter 
    params: 
    - name: done_callback 
    value: /mapred_update 
    mapper: 
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader 
    handler: mapred.process 
    params: 
    - name: entity_kind 
     default: models.Car 

说明:done_callback是mapreduce完成其操作后调用的url。 mapred.process是一个处理单个实体和更新计数器(它在mapred.py文件中定义)的函数。型号汽车在models.py

mapred.py定义:

from models import CarsByColor 
from google.appengine.ext import db 
from google.appengine.ext.mapreduce import operation as op 
from google.appengine.ext.mapreduce.model import MapreduceState 

from google.appengine.ext import webapp 
from google.appengine.ext.webapp.util import run_wsgi_app 

def process(entity): 
    """Process individual Car""" 
    color = entity.color 
    if color: 
     yield op.counters.Increment('car_color_%s' % color) 

class UpdateCounters(webapp.RequestHandler): 
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters""" 
    def post(self): 
     """Called after mapreduce operation are finished""" 
     # Finished mapreduce job id is passed in request headers 
     job_id = self.request.headers['Mapreduce-Id'] 
     state = MapreduceState.get_by_job_id(job_id) 
     to_put = [] 
     counters = state.counters_map.counters 
     # Remove counter not needed for stats 
     del counters['mapper_calls'] 
     for counter in counters.keys(): 
      stat = CarsByColor.get_by_key_name(counter) 
      if not stat: 
       stat = CarsByColor(key_name=counter, 
           name=counter) 
      stat.value = counters[counter] 
      to_put.append(stat) 
     db.put(to_put) 

     self.response.headers['Content-Type'] = 'text/plain' 
     self.response.out.write('Updated.') 


application = webapp.WSGIApplication(
            [('/mapred_update', UpdateCounters)], 
            debug=True) 
def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main()    

有CarsByColor模式的改变定义相比,略有问题的。

您可以从url:http://yourapp/mapreduce/手动启动mapreduce作业,并希望从cron(我还没有测试过cron)。

9

你并不需要减少阶段。你可以用线性任务链做到这一点,或多或少如下:

def count_colors(limit=100, totals={}, cursor=None): 
    query = Car.all() 
    if cursor: 
    query.with_cursor(cursor) 
    cars = query.fetch(limit) 
    for car in cars: 
    try: 
     totals[car.color] += 1 
    except KeyError: 
     totals[car.color] = 1 
    if len(cars) == limit: 
    cursor = query.cursor() 
    return deferred.defer(count_colors, limit, totals, cursor) 
    entities = [] 
    for color in totals: 
    entity = CarsByColor(key_name=color) 
    entity.cars_num = totals[color] 
    entities.append(entity) 
    db.put(entities) 

deferred.defer(count_colors) 

这应该遍历所有的汽车,通过一个查询光标和流水账了一系列即席任务,以及商店总数在最后。

如果您必须在单个模型中合并来自多个数据存储区,多个模型或多个索引的数据,那么缩小阶段可能有意义。因为我不认为它会给你买东西。

另一种选择:使用任务队列维护每种颜色的实时计数器。当您创建一辆汽车时,启动一项任务以增加该颜色的总量。当你更新一辆汽车时,启动一个任务来减少旧的颜色,而另一个任务则递增新的颜色。更新计数器以避免竞争条件。

相关问题