1

我想弄清楚如何从查询中初始获取所有数据,然后增量只使用kafka连接器进行更改。原因是我想将所有数据加载到弹性搜索中,然后保持与我的kafka流同步。 目前,我通过首先使用mode = bulk的连接器来完成此操作,然后将其更改为时间戳。这工作正常。但是,如果我们想要将所有数据重新加载到Streams和ES,这意味着我们必须编写一些脚本来清除或删除kafka流和es索引数据,修改连接ini以将模式设置为批量,重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(原因是偶尔需要这样的脚本,批量更新通过etl进程纠正历史数据,我们不但有控制权,并且此过程不更新时间戳)卡夫卡JDBC连接器加载所有数据,然后增量

是否有人正在做类似的事情,并且找到了更优雅的解决方案?

回答

0

很久以后又回来了。该方法能够解决这一点,从来没有使用散装模式

  1. 站连接器
  2. 擦拭偏移文件为每个连接器JVM
  3. (可选)如果你想要做一个完整的擦拭和负荷情况,想要可能还删除您的主题使用kafka/connect utils/rest api(并且不要忘记状态主题)
  4. 重新启动连接。
0

如何首先从查询中获取所有数据,然后增量式地仅使用kafka连接器进行更改。

也许这可能对你有帮助。例如,我有一个表:

╔════╦═════════════╦═══════════╗ 
║ Id ║ Name  ║ Surname ║ 
╠════╬═════════════╬═══════════╣ 
║ 1 ║ Martin  ║ Scorsese ║ 
║ 2 ║ Steven  ║ Spielberg ║ 
║ 3 ║ Christopher ║ Nolan  ║ 
╚════╩═════════════╩═══════════╝ 

在这种情况下,我将创建一个视图:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS 
SELECT 0 AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID =< 2 
UNION ALL 
SELECT ID AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID > 2; 

属性文件中对卡夫卡的JDBC连接器,你可以使用:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
mode=incrementing 
incrementing.column.name=EXID 
topic.prefix= 
tasks.max=1 
name=gv-jdbc-source-connector 
connection.url= 
table.types=VIEW 
table.whitelist=EDGE_DIRECTORS 

所以kafka jdbc连接器将采取以下步骤:

  1. 起初EXID = 0的所有数据;
  2. 它将在connector.offsets文件中存储偏移值= 0;
  3. 新行将被插入到DIRECTORS表中。
  4. 卡夫卡JDBC连接器将执行:Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS并将 注意到EXID已增加。
  5. 数据将在Kafka Streams中更新。
+0

不完全是我问的。目前即时通讯使用时间戳列。我必须将模式更改为批量才能重新加载所有内容,然后更改回时间戳以让卡夫卡逐渐加载已更改的数据或新数据(它会将查询附加到来回时间戳以执行此操作)。我希望避免每次我想从“清洁”石板开始切换模式。 – mike01010

相关问题