如何首先从查询中获取所有数据,然后增量式地仅使用kafka连接器进行更改。
也许这可能对你有帮助。例如,我有一个表:
╔════╦═════════════╦═══════════╗
║ Id ║ Name ║ Surname ║
╠════╬═════════════╬═══════════╣
║ 1 ║ Martin ║ Scorsese ║
║ 2 ║ Steven ║ Spielberg ║
║ 3 ║ Christopher ║ Nolan ║
╚════╩═════════════╩═══════════╝
在这种情况下,我将创建一个视图:
CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;
属性文件中对卡夫卡的JDBC连接器,你可以使用:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS
所以kafka jdbc连接器将采取以下步骤:
- 起初EXID = 0的所有数据;
- 它将在connector.offsets文件中存储偏移值= 0;
- 新行将被插入到DIRECTORS表中。
- 卡夫卡JDBC连接器将执行:
Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS
并将 注意到EXID已增加。
- 数据将在Kafka Streams中更新。
不完全是我问的。目前即时通讯使用时间戳列。我必须将模式更改为批量才能重新加载所有内容,然后更改回时间戳以让卡夫卡逐渐加载已更改的数据或新数据(它会将查询附加到来回时间戳以执行此操作)。我希望避免每次我想从“清洁”石板开始切换模式。 – mike01010