0

获取随机组行我有形式从PCollection

user_id, date, other_columns 
1, 2017-03-10, ... 
2, 2017-03-10, ... 
3, 2017-03-10, ... 
... 

的数据集,我需要做到以下几点:对于数据集中的每一行我要生成一个新行其中将包含当前列N行的同一天对应不同的用户如下随机子集:

row, other_rows 
{'user_id': 1, 'date': '2017-03-10', ...}, [{'user_id': 2,...},...] 
{'user_id': 2, 'date': '2017-03-10', ...}, [{'user_id': 1,...},...] 
... 

我已经实现了它的下面,但它对于大型数据集时速度很慢在云上执行。

dataset 
| 'map-to-date' >> beam.Map(lambda x: (x['date'], x)) 
| 'group-by-date' >> beam.GroupByKey() 
| 'generate-output' >> beam.ParDo(GenerateOutputRows()) 

其中GenerateOutputRows被定义为:

class GenerateOutputRows(beam.DoFn): 
    def process(self, element): 
     (date, rows) = element 
     for r in rows: 
      other_users_rows = list(filter(lambda x: x['user_id'] != r['user_id'], 
              rows)) 
      yield (r, random.sample(other_users_rows, N)) 

你能想到的用于获取所需的结果另一个更高性能的方法吗?

+0

你是否真的需要这种情况发生在每一行?或每个用户一天只有一次? – CasualT

+0

是的,我需要这个发生在每一行。我正在为ML模型生成一个训练数据集,每一行都将是一个训练样本 – pnezis

+0

数据集的大小是多少,哪个是较慢的操作?你在用多少工人?你有工作ID吗? – Pablo

回答

0

我没有立即看到算法更有效的方式这样做的,除非你能提供一些简化的假设,如:

  • 在1日,是USER_ID唯一的或可以有与同多行用户?
  • 如果是,那么同一用户的多个other_users_rows样本必须是统计独立的吗?如果不是,那么您可以对具有相同user_id的行进行多次缓存并重复使用相同的样本。
  • 不同用户的other_users_rows样本是否必须是统计独立的,或者如果说用户A的样本不包含用户B,那么可以为用户B使用完全相同的样本?

在一般情况下,这是一个算法的问题,而不是一个云数据流/阿帕奇梁的问题,因为你的代码的瓶颈是O(rows.size()^ 2)循环内GenerateOutputRows和光束不能使自动更快;我建议去http://cstheory.stackexchange.com以获得更有效算法的建议。

相关问题