使用 Kafka、MySQL 和 Debezium 设置数据流管道。我是这个版本的 Kafka - 3.4.0、MySQL - 8、Debezium - 2.2.1、Java - 11。目标:我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kafka 主题。
如果已执行以下步骤:
-
下载 Kafka - 3.4.0 并解压并保存在以下路径中:/home/divum/kafka-all/kafka_2.13-3.4.0
-
下载 Debezium - 2.2.1 并解压。这是它的路径:/usr/local/share/kafka/plugins/debezium-connector-mysql
-
添加了这一行plugin.path=/usr/local/share/kafka/plugins/debezium-connector-mysql在文件中连接分布式属性它存在于该路径中:/home/divum/kafka-all/kafka_2.13-3.4.0/config.
-
我有 MySql 和 Debezium 配置,如下所示。还启用了二进制日志。文件名为connector-config.json,位于此路径中/home/divum/kafka-all/kafka_2.13-3.4.0.
{ "name": "etl-connector",
"config": {
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max": 1,
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "iphone21",
"topic.prefix": "stream",
"database.server.id": 4816,
"database.include.list": "etl",
"table.include.list": ".*",
"column.include.list": ".*",
"include.schema.changes": false,
"database.server.name": "mysql",
"tombstones.on.delete": false,
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "history",
"plugin.path":"/usr/local/share/kafka/plugins/debezium-connector-mysql"
}
}
- 我使用此命令来启动代理、动物园管理员和连接器。
Zookeeper - bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka Broker - bin/kafka-server-start.sh config/server.properties
Connector - bin/connect-distributed.sh ./config/connect-distributed.properties
- 检查连接器插件卷曲 --location --request GET 'http://localhost:8083/connector-plugins'。得到以下输出。
[
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.4.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.4.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.4.0"
}
]
- 尝试使用此命令启动连接器 **curl -X POST -H "Content-Type: application/json" --data @connector-config.json http://localhost:8083/connectors** 并出现以下错误。是这样说的io.debezium.connector.mysql.MySqlConnector课程不可用。
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.debezium.connector.mysql.MySqlConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.4.0', encodedVersion=3.4.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.4.0', encodedVersion=3.4.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.4.0', encodedVersion=3.4.0, type=source, typeName='source', location='classpath'}"}
-
Command Used : **bin/connect-distributed.sh ./config/connect-distributed.properties **. Getting this logs.
![image-6](https://i.stack.imgur.com/CRFGk.png)
-
Plugin scan logs:
![6](https://i.stack.imgur.com/uGHjh.png)
我过去几天尝试设置但无法做到这一点。这个错误的原因是什么?知道如何解决吗?
Debezium 有开源替代品吗?