0
获取随机组行我有形式从PCollection
user_id, date, other_columns
1, 2017-03-10, ...
2, 2017-03-10, ...
3, 2017-03-10, ...
...
的数据集,我需要做到以下几点:对于数据集中的每一行我要生成一个新行其中将包含当前列N行的同一天对应不同的用户如下随机子集:
row, other_rows
{'user_id': 1, 'date': '2017-03-10', ...}, [{'user_id': 2,...},...]
{'user_id': 2, 'date': '2017-03-10', ...}, [{'user_id': 1,...},...]
...
我已经实现了它的下面,但它对于大型数据集时速度很慢在云上执行。
dataset
| 'map-to-date' >> beam.Map(lambda x: (x['date'], x))
| 'group-by-date' >> beam.GroupByKey()
| 'generate-output' >> beam.ParDo(GenerateOutputRows())
其中GenerateOutputRows
被定义为:
class GenerateOutputRows(beam.DoFn):
def process(self, element):
(date, rows) = element
for r in rows:
other_users_rows = list(filter(lambda x: x['user_id'] != r['user_id'],
rows))
yield (r, random.sample(other_users_rows, N))
你能想到的用于获取所需的结果另一个更高性能的方法吗?
你是否真的需要这种情况发生在每一行?或每个用户一天只有一次? – CasualT
是的,我需要这个发生在每一行。我正在为ML模型生成一个训练数据集,每一行都将是一个训练样本 – pnezis
数据集的大小是多少,哪个是较慢的操作?你在用多少工人?你有工作ID吗? – Pablo