2013-05-04 49 views
2

我有两个版本(旧/新)的数据库表大约100,000,000条记录。他们在文件:Mapreduce表Diff

trx-old 
trx-new 

结构为:

id date amount memo 
1 5/1  100 slacks 
2 5/1  50 wine 

id是简单的主键,其他领域都非关键。我想生成三个文件:

trx-removed (ids of records present in trx-old but not in trx-new) 
trx-added (records from trx-new whose ids are not present in trx-old) 
trx-changed (records from trx-new whose non-key values have changed since trx-old) 

我需要在一个短的批处理窗口中每天都执行此操作。实际上,我需要为多个表和跨多个模式执行此操作(为每个模式生成三个文件),所以实际应用程序会涉及更多一点。但我认为这个例子抓住了问题的症结所在。

这感觉就像是mapreduce的一个明显的应用。从未写过mapreduce应用程序我的问题是:

  1. 是否有一些EMR应用程序已经这样做?
  2. 有没有一个明显的猪或可能是级联解决方案?
  3. 是否还有其他一些开源示例与此非常接近?

PS我看到了diff between tables的问题,但在那里的解决方案看起来不可扩展。

PPS这里是一个小的Ruby玩具演示算法:Ruby dbdiff

+0

2.是的,猪解决方案至少为添加和删除的部分是明显的'LEFT OUTER JOIN'和'FILTER'基于加入的列是否为'null'。至于“改变”,我最好的猜测是一个内部的'JOIN'和根据字段是否不同的过滤器。 – TC1 2013-05-05 20:07:41

回答

2

我认为这将是最简单的只写自己的工作,主要是因为你需要使用MultipleOutputs写入到三个单独的文件从典型的减速器只写入一个文件的单个缩减步骤开始。您需要使用MultipleInputs为每个表指定一个映射器。

+0

从查看所有解决方案,我想知道使用罐装CoGroup还是一些冗余过滤更好,还是手动制作可立即写入MultipleOutput的解决方案更好。 – 2013-05-06 00:16:25

1

什么,我想起的是:

考虑你的表是这样的:

Table_old 
1 other_columns1 
2 other_columns2 
3 other_columns3 

Table_new 
2 other_columns2 
3 other_columns3 
4 other_columns4 

追加table_old的元素 “a” 和table_new的元素 “B”。

当合并这两个文件,如果一个元素上的第一个文件,而不是在第二个文件中存在该被删除

table_merged 
1a other_columns1 
2a other_columns2 
2b other_columns2 
3a other_columns3 
3b other_columns3 
4a other_columns4 

从这个文件,你可以轻松地做你的作业。

另外,假设你的ID是n位,并且你有10个集群+ 1个主。您的密钥将是ID的第一位,因此,您可以将数据平均分配给群集。你会做分组+分区,所以你的数据将被排序。

例,

table_old 
1...0 data 
1...1 data 
2...2 data 

table_new 
1...0 data 
2...2 data 
3...2 data 

你的关键是第一位和你按照那个数字你的分区是按照ID的休息分组,和。那么你的数据将会进入你的集群

worker1 
1...0b data 
1...0a data 
1...1a data 

worker2 
2...2a data 
2...2b data and so on. 

请注意,a,b不必排序。

编辑 合并将是这样的:

FileInputFormat.addInputPath(job, new Path("trx-old")); 
FileInputFormat.addInputPath(job, new Path("trx-new")); 

MR将获得两个输入和两个文件将被合并,

对于附加部件,你应该创建两个主MR之前的工作,这将只有Map。第一个Mapappend "a"到第一个列表中的每个元素,第二个将append "b"到第二个列表的元素。第三份工作(我们现在使用的工作/主图)只会减少收集工作。所以你将有Map-Map-Reduce

追加可以像

//you have key:Text 
new Text(String.valueOf(key.toString()+"a")) 

做,但我觉得有可能追加,其中一些可能是 (text hadoop)更有效

希望这将是有益的方式不同,

+0

这是有帮助的,我想我得到的主旨。但是你承担了一些我不具备的磁共振知识。特别是,我不知道如何早日提出“追加”,然后再“合并”。 – 2013-05-06 00:09:19

+0

我编辑了我的文章。我可以说我的代码变得更加困难,如果有更简单的现成代码或与您更好地使用它相关的现成库。 – smttsp 2013-05-06 08:15:43

1

这似乎是在级联中解决的完美问题。你已经提到你从来没有写过MR应用程序,如果意图是快速入门(假设你熟悉Java),那么Cascading是迈向IMHO的方式。我会在第二时间更多地谈谈这个。

可以使用Pig或Hive,但如果您想要对这些列执行附加分析或更改模式,这些可能不那么灵活,因为您可以通过从列标题中读取或在列标题中进行读取来在Cascading中实时构建Schema从您创建的映射文件来表示架构。

Cascading你会:

  1. 设置您的来电Taps:点击trxOld和Tap trxNew(这点对您的源文件)
  2. 你的水龙头连接到Pipes:管oldPipe和管newPipe
  3. 设置你的传出Taps:点击trxRemoved,点击trxAdded和点击trxChanged
  4. 建立你的管道分析(这是发生乐趣(伤害)的地方)

TRX-删除: TRX-添加

Pipe trxOld = new Pipe ("old-stuff"); 
Pipe trxNew = new Pipe ("new-stuff"); 
//smallest size Pipe on the right in CoGroup 
Pipe oldNnew = new CoGroup("old-N-new", trxOld, new Fields("id1"), 
             trxNew, new Fields("id2"), 
             new OuterJoin()); 

外加入使我们在那里ID缺失在其它管道(源数据)空值,所以我们可以在使用FilterNotNullFilterNull接下来的逻辑是让我们得到最终的管道,然后我们连接到Tap trxRemoved并点击trxAdded相应地。

TRX-改变

在这里我首先串联您在使用FieldJoiner寻找变化中的字段,然后使用ExpressionFilter给我们僵尸(因为他们改变了),这样的:

Pipe valueChange = new Pipe("changed"); 
valueChange = new Pipe(oldNnew, new Fields("oldValues", "newValues"), 
      new ExpressionFilter("oldValues.equals(newValues)", String.class), 
      Fields.All); 

这是做什么过滤掉具有相同值的字段并保持差异。而且,如果上面的表达是真实的,那么它就会摆脱那条记录。最后,将你的valueChange管道连接到你的Tap trxChanged,你将有三个输出,你正在寻找的所有数据的代码,允许一些额外的分析蠕变。

+0

这看起来非常接近实际解决方案。 CoGroup只是我需要的魔术外部连接thingie!它让我想知道CoGroup的性能是什么,以及它是否可以从我的表预先分类的事实中受益。 – 2013-05-06 00:11:15

+0

MapReduce中的连接性能应始终预期为慢速(技术术语)。该框架并不擅长加速Joins(只要问PIG Hive),一般来说Hadoop应该被认为是一辆18轮车,体重减轻了一吨,但比使用1辆勇敢的车更好。现在,关于预排序数据的第二点,我不知道它对优化有什么影响。我知道CoGroup按照它们的自然顺序对组密钥进行排序,这会告诉我它是内置的。我不得不相信预分类将比GroupBy管道中的未排序更快地运行。 – Engineiro 2013-05-06 13:47:48

1

正如@ChrisGerken建议,你将不得不使用MultipleOutputsMultipleInputs,以便生成多个输出文件并将自定义映射器关联到每个输入文件类型(旧/新)。

映射器将输出:

  • 键:主键(ID)
  • 值:从与附加标志输入文件记录(新/旧取决于输入)

的减速器将遍历所有记录R为每个键和输出:

  • 删除文件:如果只有一个带有旧标志的记录存在。
  • 添加文件:如果只有一个标记为new的记录存在。
  • 要更改的文件:如果R中的记录不同。

由于该算法随缩减器的数量而变化,所以很可能需要第二份工作,它会将结果合并到单个文件中作为最终输出。

+0

有没有答案与HOW?相关。您刚刚说过,如果记录不存在于第二个文件中,则将其视为已删除。这存在问题。 – smttsp 2013-05-05 20:18:01

+0

@smttsp:我希望这可以解决问题。减速器将得到以下输入并将其标记为已删除:'(1,{old1})',添加:'(2,{new2})',更改为:'(3,{old3,new3})'。 – harpun 2013-05-05 20:24:53

+0

我想知道@ Engineiro的解决方案(使用CoGroup)将如何与性能明智的解决方案进行比较。 – 2013-05-06 00:14:57