(1). 概述

前面对debezium进行了一个大概的了解,在这一里,通过docker来运行一个简单的案例.

“参考文献一”
“参考文献二”

(2). Docker运行Debezium后架构图如下

"在Docker运行Debezium"

(3). 提前拉取镜像

lixin-macbook:~ lixin$ docker pull debezium/zookeeper:1.2
lixin-macbook:~ lixin$ docker pull debezium/kafka:1.2
lixin-macbook:~ lixin$ docker pull debezium/example-mysql:1.2

lixin-macbook:~ lixin$ docker images|grep debezium
debezium/example-mysql                   1.2                                              8eca9c821ad6   7 weeks ago     448MB
debezium/connect                         1.2                                              db770c03ced5   7 months ago    692MB
debezium/zookeeper                       1.2                                              94dc5bf38ce0   7 months ago    488MB
debezium/kafka                           1.2                                              c2829a011b30   7 months ago    657MB

(4). 启动zk

lixin-macbook:~ lixin$ docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.2

(5). 启动kakfa

lixin-macbook:~ lixin$ docker run -d -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.2

(6). 启动mysql

lixin-macbook:~ lixin$ docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.2

(7). 启动kafka-connect

lixin-macbook:~ lixin$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.2

(8). 检查启动的容器

lixin-macbook:~ lixin$ docker ps
CONTAINER ID   IMAGE                        COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                 NAMES
8a218a19dcbc   debezium/connect:1.2         "/docker-entrypoint.…"   3 minutes ago   Up 3 minutes   8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9779/tcp                                                                               connect
8cecb93b3df6   debezium/example-mysql:1.2   "docker-entrypoint.s…"   3 minutes ago   Up 3 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql
0a7816254c1f   debezium/kafka:1.2           "/docker-entrypoint.…"   3 minutes ago   Up 3 minutes   8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                                                                         kafka
8216c51e9051   debezium/zookeeper:1.2       "/docker-entrypoint.…"   3 minutes ago   Up 3 minutes   0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 0.0.0.0:2888->2888/tcp, :::2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, :::3888->3888/tcp, 9779/tcp   zookeeper

(9). 检查connect

# 查看connect信息
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/
{
	"version":"2.5.0",
	"commit":"66563e712b0b9f84",
	"kafka_cluster_id":"Y7k-bwb6T6y_RkiZkphQjg"
}

# 查看所有的connectors
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/connectors/
[]

(10). 注册一个MySQL connector

# *************************************************************************
# 1. 注册一个:MySQL connector,注意:此时:kafka-connect会有大量的日志输出
# *************************************************************************
lixin-macbook:~ lixin$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
HTTP/1.1 201 Created
Date: Sun, 12 Sep 2021 02:27:19 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 487
Server: Jetty(9.4.24.v20191120)
{
	"name": "inventory-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"tasks.max": "1",
		"database.hostname": "mysql",
		"database.port": "3306",
		"database.user": "debezium",
		"database.password": "dbz",
		"database.server.id": "184054",
		"database.server.name": "dbserver1",
		"database.whitelist": "inventory",
		"database.history.kafka.bootstrap.servers": "kafka:9092",
		"database.history.kafka.topic": "dbhistory.inventory",
		"name": "inventory-connector"
	},
	"tasks": [],
	"type": "source"
}


# *************************************************************************
# 2. 再次查看:inventory-connector
# *************************************************************************
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/connectors/inventory-connector
{
	"name": "inventory-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"database.user": "debezium",
		"database.server.id": "184054",
		"tasks.max": "1",
		"database.hostname": "mysql",
		"database.password": "dbz",
		"database.history.kafka.bootstrap.servers": "kafka:9092",
		"database.history.kafka.topic": "dbhistory.inventory",
		"name": "inventory-connector",
		"database.server.name": "dbserver1",
		"database.whitelist": "inventory",
		"database.port": "3306"
	},
	"tasks": [{
		"connector": "inventory-connector",
		"task": 0
	}],
	"type": "source"
}

(11). 查看下采集的binlog

lixin-macbook:~ lixin$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.2 watch-topic -a -k dbserver1.inventory.customers
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.6:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.2.5.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1631413644935,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"1.2.5.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1631413644935,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"1.2.5.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1631413644935,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"1.2.5.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1631413644935,"transaction":null}}

(12). 查看某一项具体信息

{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "int32",
			"optional": false,
			"field": "id"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Key"
	},
	"payload": {
		"id": 1001
	}
} {
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Envelope"
	},
	"payload": {
		"before": null,                     # 因为是insert,所以before为空
		"after": {                          # 插入的数据
			"id": 1001,
			"first_name": "Sally",
			"last_name": "Thomas",
			"email": "sally.thomas@acme.com"
		},
		"source": {
			"version": "1.2.5.Final",
			"connector": "mysql",
			"name": "dbserver1",
			"ts_ms": 0,
			"snapshot": "true",
			"db": "inventory",
			"table": "customers",
			"server_id": 0,                                # 每条记录会对应server_id
			"gtid": null,                                  # 每条记录会寺应gtid(解决跨区域回环问题)
			"file": "mysql-bin.000003",
			"pos": 154,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "c",
		"ts_ms": 1631413644935,
		"transaction": null
	}
}

(13). 再次查看docker容器运行

# 其中有一个kafka是消费者监听来着的.
lixin-macbook:~ lixin$ docker ps
CONTAINER ID   IMAGE                        COMMAND                  CREATED          STATUS          PORTS                                                                                                                                                 NAMES
75cbd9048d54   debezium/kafka:1.2           "/docker-entrypoint.…"   7 minutes ago    Up 7 minutes    8778/tcp, 9092/tcp, 9779/tcp                                                                                                                          watcher
8a218a19dcbc   debezium/connect:1.2         "/docker-entrypoint.…"   21 minutes ago   Up 21 minutes   8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9779/tcp                                                                               connect
8cecb93b3df6   debezium/example-mysql:1.2   "docker-entrypoint.s…"   21 minutes ago   Up 21 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql
0a7816254c1f   debezium/kafka:1.2           "/docker-entrypoint.…"   21 minutes ago   Up 21 minutes   8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                                                                         kafka
8216c51e9051   debezium/zookeeper:1.2       "/docker-entrypoint.…"   22 minutes ago   Up 22 minutes   0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 0.0.0.0:2888->2888/tcp, :::2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, :::3888->3888/tcp, 9779/tcp   zookeeper

(14). 进入mysql,创建数据,查看kafka消费者是否能实时监听

# 1. 查看mysql在docker下的容器id
lixin-macbook:~ lixin$ docker ps|grep mysql
8cecb93b3df6   debezium/example-mysql:1.2   "docker-entrypoint.s…"   44 minutes ago   Up 44 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql

# 2. 进入容器内部
lixin-macbook:~ lixin$ docker exec -it 8cecb93b3df6 /bin/bash

# 3. 运行mysql客户端(mysqluser/mysqlpw)
root@8cecb93b3df6:/# mysql -u mysqluser -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.35-log MySQL Community Server (GPL)
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

# 4. 查看有哪些库
mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| inventory          |
+--------------------+
2 rows in set (0.01 sec)

# 5. 进入:inventory库
mysql> USE inventory;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

# 6. 查看有哪些库
mysql> SHOW TABLES;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
6 rows in set (0.00 sec)

# 7. 查看表结构信息
mysql> DESC customers;
+------------+--------------+------+-----+---------+----------------+
| Field      | Type         | Null | Key | Default | Extra          |
+------------+--------------+------+-----+---------+----------------+
| id         | int(11)      | NO   | PRI | NULL    | auto_increment |
| first_name | varchar(255) | NO   |     | NULL    |                |
| last_name  | varchar(255) | NO   |     | NULL    |                |
| email      | varchar(255) | NO   | UNI | NULL    |                |
+------------+--------------+------+-----+---------+----------------+

# 8. 插入数据,注意:观察kafka consumer
mysql> INSERT INTO customers(first_name,last_name,email) VALUES("xin","lixin","test@126.com");
Query OK, 1 row affected (0.01 sec)

# 9. 更新刚插入的数据
mysql> UPDATE customers SET first_name = "xin--" WHERE id = 1005;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

(15). 查看kafka-consumer插入的数据

# 1. 插入数据的数据结构
{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "int32",
			"optional": false,
			"field": "id"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Key"
	},
	"payload": {
		"id": 1005
	}
} {
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Envelope"
	},
	"payload": {
		"before": null,
		"after": {                                          # 
			"id": 1005,
			"first_name": "xin",
			"last_name": "lixin",
			"email": "test@126.com"
		},
		"source": {
			"version": "1.2.5.Final",
			"connector": "mysql",
			"name": "dbserver1",
			"ts_ms": 1631416116000,
			"snapshot": "false",
			"db": "inventory",
			"table": "customers",
			"server_id": 223344,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 364,
			"row": 0,
			"thread": 6,
			"query": null
		},
		"op": "c",                                         # 插入操作
		"ts_ms": 1631416116417,
		"transaction": null
	}
}

(16). 查看kafka-consumer更新的数据

{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "int32",
			"optional": false,
			"field": "id"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Key"
	},
	"payload": {
		"id": 1005
	}
} {
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "int32",
				"optional": false,
				"field": "id"
			}, {
				"type": "string",
				"optional": false,
				"field": "first_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "last_name"
			}, {
				"type": "string",
				"optional": false,
				"field": "email"
			}],
			"optional": true,
			"name": "dbserver1.inventory.customers.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "dbserver1.inventory.customers.Envelope"
	},
	"payload": {
		"before": {       
			"id": 1005,                            # 修改之前的数据
			"first_name": "xin",                   
			"last_name": "lixin",
			"email": "test@126.com"
		},
		"after": {
			"id": 1005,                            # 修改之后的数据
			"first_name": "xin--",
			"last_name": "lixin",
			"email": "test@126.com"
		},
		"source": {
			"version": "1.2.5.Final",
			"connector": "mysql",
			"name": "dbserver1",
			"ts_ms": 1631416216000,
			"snapshot": "false",
			"db": "inventory",
			"table": "customers",
			"server_id": 223344,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 668,
			"row": 0,
			"thread": 6,
			"query": null
		},
		"op": "u",                                       # 更新操作
		"ts_ms": 1631416216727,
		"transaction": null
	}
}

(17). 验证zk里只存储了kafka的信息

# 1. 查看zookeeper对应的容器id
lixin-macbook:~ lixin$ docker ps|grep zookeeper
8216c51e9051   debezium/zookeeper:1.2       "/docker-entrypoint.…"   About an hour ago   Up About an hour   0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 0.0.0.0:2888->2888/tcp, :::2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, :::3888->3888/tcp, 9779/tcp   zookeeper

# 2. 进入容器内部
lixin-macbook:~ lixin$ docker exec -it  8216c51e9051 /bin/bash
[zookeeper@8216c51e9051 ~]$ cd bin/

# 3. 运行zkCli.sh命令
[zookeeper@8216c51e9051 bin]$ ./zkCli.sh
Connecting to localhost:2181
......

# 4. 查看path(这里的所有path是否一定都是kafka的呢?后面自己搭一个kafka,再回来验证)
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

(18). 查看kafka容器信息

# 注意:
# 容器ID:75cbd9048d54为消费者
# 容器ID:0a7816254c1f为kakfa服务器

# 1. 查看运行的kafka容器
lixin-macbook:~ lixin$ docker ps |grep kafka
75cbd9048d54   debezium/kafka:1.2           "/docker-entrypoint.…"   59 minutes ago      Up 59 minutes      8778/tcp, 9092/tcp, 9779/tcp                                                                                                                          watcher
0a7816254c1f   debezium/kafka:1.2           "/docker-entrypoint.…"   About an hour ago   Up About an hour   8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                                                                         kafka

# 2. 进入容器内部
lixin-macbook:~ lixin$ docker exec -it 0a7816254c1f /bin/bash

# 3. 查看当前所在目录
[kafka@0a7816254c1f ~]$ pwd
/kafka

# 4. 查看容器内部的文件信息
[kafka@0a7816254c1f ~]$ ll
drwxrwxrwx 1 root  root   4096 Apr  8  2020 bin                  # kafka二进制目录
drwxrwxrwx 2 root  root   4096 Sep 12 02:18 config               # kafak配置文件目录
drwxrwxrwx 1 root  root   4096 Feb  6  2021 config.orig
drwxrwxrwx 3 kafka kafka  4096 Sep 12 02:18 data                 # kafka数据目录
drwxrwxrwx 1 root  root   4096 Feb  6  2021 libs                 # kafka依赖lib
drwxrwxrwx 2 kafka kafka  4096 Sep 12 03:08 logs                 # kafka日志目录

(19). 查看kafka-connect容器信息

kafka-connect估计是在kafka容器的基础上封装的一层镜像.

# 1. 查看connect运行的容器id
lixin-macbook:~ lixin$ docker ps|grep connect
8a218a19dcbc   debezium/connect:1.2         "/docker-entrypoint.…"   About an hour ago   Up About an hour   8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9779/tcp                                                                               connect

# 2. 进入容器内部
lixin-macbook:~ lixin$ docker exec -it 8a218a19dcbc /bin/bash

# 3. 查看当前所在的目录
[kafka@8a218a19dcbc ~]$ pwd
/kafka

# 4. 查看运行的java进程.
[kafka@8a218a19dcbc ~]$ jps -l
1 org.apache.kafka.connect.cli.ConnectDistributed

# 5. 查看目录信息.
[kafka@8a218a19dcbc ~]$ ll
drwxrwxrwx 1 root  root   4096 Apr  8  2020 bin
drwxrwxrwx 2 root  root   4096 Sep 12 02:18 config
drwxrwxrwx 1 root  root   4096 Feb  6  2021 config.orig
drwxr-xr-x 1 kafka kafka  4096 Feb  6  2021 connect                          # connect新增的内容
drwxrwxrwx 2 kafka kafka  4096 Feb  6  2021 data 
drwxr-xr-x 1 kafka kafka  4096 Feb  6  2021 external_libs                    # connect新增的内容
drwxrwxrwx 1 root  root   4096 Feb  6  2021 libs
drwxrwxrwx 2 kafka kafka  4096 Sep 12 03:00 logs

# 6. 查看目录connect下内容
[kafka@8a218a19dcbc ~]$ ll connect/
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-db2
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-mongodb
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-mysql
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-oracle
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-postgres
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-connector-sqlserver

# 7. 查看目录connect/debezium-connector-mysql/下内容
[kafka@8a218a19dcbc ~]$ ll connect/debezium-connector-mysql/
-rw-r--r-- 1 kafka kafka  337904 Sep 24  2020 antlr4-runtime-4.7.2.jar
-rw-r--r-- 1 kafka kafka   19653 Sep 24  2020 debezium-api-1.2.5.Final.jar
-rw-r--r-- 1 kafka kafka  255757 Sep 24  2020 debezium-connector-mysql-1.2.5.Final.jar
-rw-r--r-- 1 kafka kafka  827663 Sep 24  2020 debezium-core-1.2.5.Final.jar
-rw-r--r-- 1 kafka kafka 2703943 Sep 24  2020 debezium-ddl-parser-1.2.5.Final.jar
-rw-r--r-- 1 kafka kafka  173228 Sep 24  2020 mysql-binlog-connector-java-0.20.1.jar
-rw-r--r-- 1 kafka kafka 2293144 Sep 24  2020 mysql-connector-java-8.0.16.jar

# ********************************************************************************
# 8. 查看kafka-connector配置(config/connect-distributed.properties)
# ********************************************************************************
[kafka@8a218a19dcbc ~]$  cat config/connect-distributed.properties  | grep -Ev '^$|#'
bootstrap.servers=172.17.0.3:9092
group.id=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=my_connect_offsets
offset.storage.replication.factor=1
config.storage.topic=my_connect_configs
config.storage.replication.factor=1
status.storage.topic=my_connect_statuses
status.storage.replication.factor=1
offset.flush.interval.ms=60000
rest.host.name=172.17.0.5
rest.port=8083
rest.advertised.host.name=172.17.0.5
rest.advertised.port=8083
plugin.path=/kafka/connect
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

# 9. 查看目录external_libs下内容
[kafka@8a218a19dcbc ~]$ ll external_libs/
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 apicurio
drwxr-xr-x 2 kafka kafka 4096 Feb  6  2021 debezium-scripting

(20). Kafka Connect API

# 查看有哪些插件
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/connector-plugins
[{
	"class": "io.debezium.connector.db2.Db2Connector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "io.debezium.connector.mongodb.MongoDbConnector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "io.debezium.connector.mysql.MySqlConnector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "io.debezium.connector.oracle.OracleConnector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "io.debezium.connector.postgresql.PostgresConnector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "io.debezium.connector.sqlserver.SqlServerConnector",
	"type": "source",
	"version": "1.2.5.Final"
}, {
	"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
	"type": "sink",
	"version": "2.5.0"
}, {
	"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
	"type": "source",
	"version": "2.5.0"
}, {
	"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
	"type": "source",
	"version": "1"
}, {
	"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
	"type": "source",
	"version": "1"
}, {
	"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
	"type": "source",
	"version": "1"
}]

(21). 总结

因为,自己有把源码拉取下来,并编译通过,所以,后面会脱离容器,在本地部署,并进行源码分析.