2017-09-26 66 views
0

我正在设计一个以读取的平面文件开始的数据管道。文件中的每一行都是单个记录。使用模糊匹配重复数据删除流处理的最佳做法

加载之后,每个记录都将被解析,转换和丰富。这发生与其他记录无关。

作为最后一步,我想基于几个记录的字段的模糊匹配来重复记录记录。要做到这一点,我想获得2记录的所有组合。

当前我使用sql表作为缓冲区。我的表中包含的所有记录,我加入了表本身,on即键不同的条件,并在名称模糊匹配与sounds like

CREATE TABLE temp_tblSoundsLikeName AS 
SELECT DISTINCT clients1.client_name client_name1, 
       clients1.client_id client_id1, 
       clients2.client_name client_name2, 
       clients2.client_id client_id2, 
FROM tblClients clients1 
    JOIN tblClients clients2 
    ON clients1.client_name != clients2.client_name 
     AND clients1.ban_id < clients2.ban_id 
     AND SUBSTRING_INDEX(clients2.client_name,' ',1) SOUNDS LIKE SUBSTRING_INDEX(clients1.client_name,' ',1) 

在temp_tblSoundsLikeName的记录代表的重复,我会在tblClients合并。

我正在考虑使用卡夫卡流,我以前没有用过。当消息M(代表记录R)到达重复数据删除主题时,我希望我的应用程序使用它并因此生成包含来自R和另一个消息R'的信息的消息,其中R'是到达的任何消息在过去5个小时内的重复数据删除阶段。这些包含2条消息组合的消息应发送到另一个主题,在这些主题中可以通过匹配和模糊匹配条件进行过滤,最后阶段是合并重复记录并将合并记录推送到具有kafka连接JDBC的RDBMS。

但我不确定如何为所有这样的RR'组合创建消息。 这可能吗? 这是Kafka Streams的一个很好的用例吗?

回答

2

使用Kafka的Streams API进行重复数据删除的起点为EventDeduplicationLambdaIntegrationTest.java,地址为https://github.com/confluentinc/kafka-streams-examples(Confluent Platform 3.3.0/Apache Kafka 0.11.0:EventDeduplicationLambdaIntegrationTest.java的直接链接)。

isDuplicate控制新的事件是否被认为是重复的方法:

private boolean isDuplicate(final E eventId) { 
    long eventTime = context.timestamp(); 
    WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
     eventId, 
     eventTime - leftDurationMs, 
     eventTime + rightDurationMs); 
    boolean isDuplicate = timeIterator.hasNext(); 
    timeIterator.close(); 
    return isDuplicate; 

eventIdStore是一个所谓的“状态存储”,它可以让你要记住,从过去的事件信息这样你就可以做出“重复是/否?”决定。

当一个消息M(代表记录R)到达的重复数据删除主题,我想我的应用程序使用,并作为结果,生成含有从R和从另一个消息R”,其中,所述信息的消息R'是过去5个小时内到达重复数据删除阶段的任何消息。这些包含2条消息组合的消息应发送到另一个主题,在这些主题中可以通过匹配和模糊匹配条件进行过滤,最后阶段是合并重复记录并将合并记录推送到具有kafka连接JDBC的RDBMS。你有

一种选择是做“赋予了新的R,让我们找到所有R'邮件,然后重复数据删除”一步到位,即在一个处理步骤(类似于上面的例子,使用所谓的Transformer)执行此操作,而不是创建一堆新的下游消息,这会导致写入放大(1 * R => N * "(R/R')"下游消息)。国家商店可用于追踪所有之前的消息,包括您在R抵达时感兴趣的各种R'

+0

@ michael-g-noll谢谢你的回答。我想重复匹配模糊匹配(例如字符串编辑距离)。如果我想要完全匹配,我可以看到州商店如何有用,但是如何使用州商店获得各种用于模糊匹配的“R”?例如,如果'R = Michael'我想要去掉'R'1 = michael,R'2 = Michale,R'3 = Michele' – polo

+0

您可以实现一个自定义状态存储,允许您传入“Michael “,并返回模糊匹配”迈克尔“(如迈克尔,迈克尔,米歇尔)的条目列表。见例如'ProbabilisticCountingScalaIntegrationTest'在https://github.com/confluentinc/kafka-streams-examples上演示如何实现自己的自定义状态存储(这个特定的例子与你的用例不同,但问题是你可以插入在你自己的国家商店,就像一个经过优化的模糊匹配)。 –