你想在这里做什么,基本上是加入两张表并比较结果,对吧?您可以查看my answer to this question,查看您可以连接两个表的两种方式(侧面输入或CoGroupByKey)。
我还将使用CoGroupByKey
为您的问题编写解决方案。我在Python中编写代码是因为我更熟悉Python SDK,但是您可以在Java中实现类似的逻辑:
def make_kv_pair(x):
""" Output the record with the x[0]+x[1] key added."""
return ((x[0], x[1]), x)
table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
| 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
| 'SetKeysB' >> beam.Map(make_kv_pair))
joined_tables = ({'table_a': table_a, 'table_b': table_b}
| beam.CoGroupByKey())
output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
def process((key, values)):
table_a_value = list(values['table_a'])
table_b_value = list(values['table_b'])
if table_a_value == table_b_value:
yield pvalue.TaggedOutput('unchanged', key)
elif len(table_a_value) < len(table_b_value):
yield pvalue.TaggedOutput('added', key)
elif len(table_a_value) > len(table_b_value):
yield pvalue.TaggedOutput('removed', key)
elif table_a_value != table_b_value:
yield pvalue.TaggedOutput('changed', key)
key_collections = (joined_tables
| beam.ParDo(FilterDoFn()).with_outputs(*output_types))
# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)