(1). 概述

前面通过Docker运行了Debezium,但是,通过Docker之后,你会感觉到Debezium是一个黑盒了,所以,从这一篇开始, 通过源码编译Debezium,并,在本机跑Debezium的Demo.

(2). 准备工作

(3). MySQL 配置

log-bin=mysql-bin
binlog_format=row
server-id=1
mysql> SHOW MASTER STATUS \G
*************************** 1. row ***************************
             File: mysql-bin.000393
         Position: 154
     Binlog_Do_DB:
 Binlog_Ignore_DB:
Executed_Gtid_Set:
1 row in set (0.00 sec)
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

(4). 运行Zookeeper(略)

(5). Kafka单机搭建(略)

(6). 配置Kafka Connector

# 1. 进入kafka安装目录
lixin-macbook:~ lixin$ cd ~/Developer/kafka-single/

# 2. 查看目录信息
lixin-macbook:kafka-single lixin$ ll
drwxr-xr-x@  39 lixin  staff   1248  9 14 21:09 bin/
drwxr-xr-x@  18 lixin  staff    576  9 25 11:29 config/
drwxr-xr-x    2 lixin  staff     64  9 25 11:26 data/
drwxr-xr-x@ 105 lixin  staff   3360  9 14 21:09 libs/
drwxr-xr-x@  12 lixin  staff    384  9 14 21:09 licenses/
drwxr-xr-x@   3 lixin  staff     96  9 14 21:09 site-docs/

# 3. 创建kakfa-connector目录
lixin-macbook:kafka-single lixin$ mkdir connect
lixin-macbook:kafka-single lixin$ cd connect/

# 4. 拷贝:debezium-connector-mysql到kafka目录下(connect)
lixin-macbook:connect lixin$ cp /Users/lixin/Developer/debezium/debezium-connector-mysql/target/debezium-connector-mysql-1.7.0-SNAPSHOT-plugin.tar.gz  ./
# 4.1 解压插件
lixin-macbook:connect lixin$ tar -zxvf debezium-connector-mysql-1.7.0-SNAPSHOT-plugin.tar.gz
# 4.2 删除压缩文件
lixin-macbook:connect lixin$ rm -rf debezium-connector-mysql-1.7.0-SNAPSHOT-plugin.tar.gz
# 4.3 查看connect目录下的内容
lixin-macbook:connect lixin$ ll
drwxr-xr-x  19 lixin  staff  608  9 25 11:36 debezium-connector-mysql/

# 5. 查看:connect目录结构.
lixin-macbook:connect lixin$ pwd
/Users/lixin/Developer/kafka-single/connect

# *************************************************************************
# 注意:我的mysql是8.0以上的
# *************************************************************************
lixin-macbook:connect lixin$ tree
.
└── debezium-connector-mysql
    ├── antlr4-runtime-4.8.jar
    ├── debezium-api-1.7.0-SNAPSHOT.jar
    ├── debezium-connector-mysql-1.7.0-SNAPSHOT.jar
    ├── debezium-core-1.7.0-SNAPSHOT.jar
    ├── debezium-ddl-parser-1.7.0-SNAPSHOT.jar
    ├── failureaccess-1.0.1.jar
    ├── guava-30.0-jre.jar
    ├── mysql-binlog-connector-java-0.25.1.jar
    └── mysql-connector-java-8.0.26.jar

(7). 启动Kafka Broder

lixin-macbook:kafka-single lixin$ ./bin/kafka-server-start.sh -daemon ./config/server.properties

(8). 启动Kafka connector集群模式

# (connect-distributed.properties)增加如下内容即可
rest.host.name=127.0.0.1
rest.port=8083
rest.advertised.host.name=127.0.0.1
rest.advertised.port=8083
plugin.path=/Users/lixin/Developer/kafka-single/external_libs
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
# 1. 启动kakfa-connector(指定:connect-distributed.properties)
lixin-macbook:kafka-single lixin$ ./bin/connect-distributed.sh  ./config/connect-distributed.properties

# 2. 检查8083端口是否启动成功
lixin-macbook:~ lixin$ lsof -i tcp:8083
COMMAND  PID  USER   FD   TYPE            DEVICE SIZE/OFF NODE NAME
java    9222 lixin  116u  IPv6 0x297c8de3ac9b05f      0t0  TCP localhost:us-srv (LISTEN)
# 查看connect信息
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/
{"version":"2.8.1","commit":"839b886f9b732b15","kafka_cluster_id":"EHsmpSxxRteUElGdbyDE9A"}

# 查看所有的connectors
lixin-macbook:~ lixin$ curl -H "Accept:application/json" localhost:8083/connectors/
[]
# 向kafka connector注册(mysql connector信息)
# 给自己挖了个坑,我在mac下把mysql的端口改成了3307,然后,debezium连接的信息配置的端口是:3306,但是,报错信息却是这样:  
#   The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
#    com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
#   The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
# 注意:mysql8.0.26的driver是支持mysql5.7数据库的.  

lixin-macbook:~ lixin$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "1", "database.server.name": "test2", "database.whitelist": "test2", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.test2" } }'

(9). 查看kafka目录信息

# 1. 查看有哪些表
mysql> show tables;
+-----------------+
| Tables_in_test2 |
+-----------------+
| test            |
| user            |
+-----------------+

# 2. 查看kafka数据目上录
lixin-macbook:~ lixin$ ll ~/Developer/kafka-single/data/

# 3. 存放的是数据库的信息(binlog/position)
drwxr-xr-x   6 lixin  staff   192  9 26 21:44 dbhistory.test2-0/             # binlog信息

# 4. 存放的是表结构信息和表里的数据
drwxr-xr-x   6 lixin  staff   192  9 26 21:44 test2-0/                       # 表结构信息
drwxr-xr-x   6 lixin  staff   192  9 26 21:44 test2.test2.test-0/            # 表里的数据
drwxr-xr-x   6 lixin  staff   192  9 26 21:44 test2.test2.user-0/            # 表里的数据

(10). 查看kafka topic列表

lixin-macbook:kafka-single lixin$ ./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
__consumer_offsets
dbhistory.test2                               # 存binlog的相关信息.
test2                                         # 数据库的基本信息
test2.test2.test                              # 表里的数据信息
test2.test2.user                              # 表里的数据信息

(11). 通过kafka进行消费

# 1. 监听队列消费
lixin-macbook:kafka-single lixin$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --from-beginning --topic test2.test2.user

# 2. mysql修改数据
mysql> INSERT INTO user(id,nick,phone,password,email,account) VALUES(7,'小娜',999999,999999,'test@126.com',NULL);

# 3. kafka日志
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"nick"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"account"}],"optional":true,"name":"test2.test2.user.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"nick"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"account"}],"optional":true,"name":"test2.test2.user.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"test2.test2.user.Envelope"},"payload":{"before":null,"after":{"id":7,"nick":"小娜","phone":"999999","password":"999999","email":"test@126.com","account":null},"source":{"version":"1.6.1.Final","connector":"mysql","name":"test2","ts_ms":1632664785000,"snapshot":"false","db":"test2","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"mysql-bin.000405","pos":923,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1632664785925,"transaction":null}}

(12). 总结

通过本地部署一套应用后,后面会对Debezinum源码进行深度剖析.