Kafka 连接可以使用批量模式的自定义查询吗?

2024-02-07

我正在尝试发送 7 天前的每行记录。这是我正在研究的配置,但它 即使查询在数据库服务器上生成记录也不起作用。

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 1,
    "mode": "bulk",
    "connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
    "query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
    "topic.prefix": "test-jdbc-",
    "poll.interval.ms": 10000
}

JDBC源连接器通过JDBC驱动程序将数据从关系数据库导入到Apache Kafka主题中。 数据定期加载,或者基于时间戳增量加载,或者批量加载。最初,无论模式是增量还是批量,当您创建 JDBC 连接器时,它都会将所有数据加载到主题中,然后仅加载时间戳列上的新行或修改行。

Bulk:这种模式是未经过滤的,因此根本不是增量的。它将在每次迭代时加载表中的所有行。如果您想定期转储整个表,其中条目最终被删除并且下游系统可以安全地处理重复项,那么这可能很有用。 这意味着您无法使用批量模式增量加载最近 7 天的数据

时间戳列:在此模式下,包含修改时间戳的单个列用于跟踪上次处理数据的时间,并仅查询自该时间以来已修改的行。在这里您可以加载增量数据。但是它是如何工作的,当您第一次创建时,它将加载数据库表中的所有可用数据,因为对于 JDBC 连接器来说,这些都是新数据。稍后它只会加载新的或修改的数据。

现在,根据您的要求,您似乎正在尝试以某个时间间隔加载所有数据,该时间间隔将配置为“poll.interval.ms”:10000。我看到您的 JDBC 连接设置是按照定义的,而查询可能无法正常工作,请尝试使用查询如下。似乎 JDBC 连接器将查询包装为一个表,如果您添加案例,则该表不起作用。

"query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",

尝试以下设置

{
  "name": "application_name",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connection.url": "jdbc:mysql://mysql:3300/test_db",
  "connection.user": "root",
  "connection.password": "password",
  "connection.attempts": "1",
  "mode": "bulk",
  "validate.non.null": false,
  "query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",
  "table.types": "TABLE",
  "topic.prefix": "test-jdbc-",
 "poll.interval.ms": 10000
  "schema.ignore": true,
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false"
  
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 连接可以使用批量模式的自定义查询吗? 的相关文章

随机推荐