JDBC源连接器通过JDBC驱动程序将数据从关系数据库导入到Apache Kafka主题中。
数据定期加载,或者基于时间戳增量加载,或者批量加载。最初,无论模式是增量还是批量,当您创建 JDBC 连接器时,它都会将所有数据加载到主题中,然后仅加载时间戳列上的新行或修改行。
Bulk:这种模式是未经过滤的,因此根本不是增量的。它将在每次迭代时加载表中的所有行。如果您想定期转储整个表,其中条目最终被删除并且下游系统可以安全地处理重复项,那么这可能很有用。
这意味着您无法使用批量模式增量加载最近 7 天的数据
时间戳列:在此模式下,包含修改时间戳的单个列用于跟踪上次处理数据的时间,并仅查询自该时间以来已修改的行。在这里您可以加载增量数据。但是它是如何工作的,当您第一次创建时,它将加载数据库表中的所有可用数据,因为对于 JDBC 连接器来说,这些都是新数据。稍后它只会加载新的或修改的数据。
现在,根据您的要求,您似乎正在尝试以某个时间间隔加载所有数据,该时间间隔将配置为“poll.interval.ms”:10000。我看到您的 JDBC 连接设置是按照定义的,而查询可能无法正常工作,请尝试使用查询如下。似乎 JDBC 连接器将查询包装为一个表,如果您添加案例,则该表不起作用。
"query": "select * from (select * from test_table where modified > now() - interval '7' day) o",
尝试以下设置
{
"name": "application_name",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:mysql://mysql:3300/test_db",
"connection.user": "root",
"connection.password": "password",
"connection.attempts": "1",
"mode": "bulk",
"validate.non.null": false,
"query": "select * from (select * from test_table where modified > now() - interval '7' day) o",
"table.types": "TABLE",
"topic.prefix": "test-jdbc-",
"poll.interval.ms": 10000
"schema.ignore": true,
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}