我正在设计一个以读取的平面文件开始的数据管道。文件中的每一行都是单个记录。使用模糊匹配重复数据删除流处理的最佳做法
加载之后,每个记录都将被解析,转换和丰富。这发生与其他记录无关。
作为最后一步,我想基于几个记录的字段的模糊匹配来重复记录记录。要做到这一点,我想获得2记录的所有组合。
当前我使用sql表作为缓冲区。我的表中包含的所有记录,我加入了表本身,on
即键不同的条件,并在名称模糊匹配与sounds like
:
CREATE TABLE temp_tblSoundsLikeName AS
SELECT DISTINCT clients1.client_name client_name1,
clients1.client_id client_id1,
clients2.client_name client_name2,
clients2.client_id client_id2,
FROM tblClients clients1
JOIN tblClients clients2
ON clients1.client_name != clients2.client_name
AND clients1.ban_id < clients2.ban_id
AND SUBSTRING_INDEX(clients2.client_name,' ',1) SOUNDS LIKE SUBSTRING_INDEX(clients1.client_name,' ',1)
在temp_tblSoundsLikeName的记录代表的重复,我会在tblClients合并。
我正在考虑使用卡夫卡流,我以前没有用过。当消息M
(代表记录R
)到达重复数据删除主题时,我希望我的应用程序使用它并因此生成包含来自R
和另一个消息R'
的信息的消息,其中R'
是到达的任何消息在过去5个小时内的重复数据删除阶段。这些包含2条消息组合的消息应发送到另一个主题,在这些主题中可以通过匹配和模糊匹配条件进行过滤,最后阶段是合并重复记录并将合并记录推送到具有kafka连接JDBC的RDBMS。
但我不确定如何为所有这样的R
和R'
组合创建消息。 这可能吗? 这是Kafka Streams的一个很好的用例吗?
@ michael-g-noll谢谢你的回答。我想重复匹配模糊匹配(例如字符串编辑距离)。如果我想要完全匹配,我可以看到州商店如何有用,但是如何使用州商店获得各种用于模糊匹配的“R”?例如,如果'R = Michael'我想要去掉'R'1 = michael,R'2 = Michale,R'3 = Michele' – polo
您可以实现一个自定义状态存储,允许您传入“Michael “,并返回模糊匹配”迈克尔“(如迈克尔,迈克尔,米歇尔)的条目列表。见例如'ProbabilisticCountingScalaIntegrationTest'在https://github.com/confluentinc/kafka-streams-examples上演示如何实现自己的自定义状态存储(这个特定的例子与你的用例不同,但问题是你可以插入在你自己的国家商店,就像一个经过优化的模糊匹配)。 –