2017-08-25 60 views
0

我一般提出这个问题,因为它可能是一个通用的答案。但是一个具体的例子是将2个BigQuery表与相同的模式进行比较,但可能会有不同的数据。我想要一个差异,即相对于一个组合键,例如,添加,删除,修改的内容。前2列。如何使用Apache Beam Python SDK在两个给定密钥的源上执行“差异”?

Table A 
C1 C2 C3 
----------- 
a a 1 
a b 1 
a c 1 

Table B  
C1 C2 C3 # Notes if comparing B to A 
------------------------------------- 
a a 1 # No Change to the key a + a 
a b 2 # Key a + b Changed from 1 to 2 
      # Deleted key a + c with value 1 
a d 1 # Added key a + d 

我基本上希望能够做出/报告比较笔记。 或者从光束的角度来看,我可能只想输出最多4个标记的PCollections:不变,更改,添加,删除。我该怎么做,以及PCollections会是什么样子?

回答

0

你想在这里做什么,基本上是加入两张表并比较结果,对吧?您可以查看my answer to this question,查看您可以连接两个表的两种方式(侧面输入或CoGroupByKey)。

我还将使用CoGroupByKey为您的问题编写解决方案。我在Python中编写代码是因为我更熟悉Python SDK,但是您可以在Java中实现类似的逻辑:

def make_kv_pair(x): 
    """ Output the record with the x[0]+x[1] key added.""" 
    return ((x[0], x[1]), x) 

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....)) 
      | 'SetKeysA' >> beam.Map(make_kv_pair) 
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....)) 
      | 'SetKeysB' >> beam.Map(make_kv_pair)) 

joined_tables = ({'table_a': table_a, 'table_b': table_b} 
       | beam.CoGroupByKey()) 


output_types = ['changed', 'added', 'deleted', 'unchanged'] 
class FilterDoFn(beam.DoFn): 
    def process((key, values)): 
    table_a_value = list(values['table_a']) 
    table_b_value = list(values['table_b']) 
    if table_a_value == table_b_value: 
     yield pvalue.TaggedOutput('unchanged', key) 
    elif len(table_a_value) < len(table_b_value): 
     yield pvalue.TaggedOutput('added', key) 
    elif len(table_a_value) > len(table_b_value): 
     yield pvalue.TaggedOutput('removed', key) 
    elif table_a_value != table_b_value: 
     yield pvalue.TaggedOutput('changed', key) 

key_collections = (joined_tables 
        | beam.ParDo(FilterDoFn()).with_outputs(*output_types)) 

# Now you can handle each output 
key_collections.unchanged | WriteToText(...) 
key_collections.changed | WriteToText(...) 
key_collections.added | WriteToText(...) 
key_collections.removed | WriteToText(...)