认识CDC工具go-mysql-transfer

1.认识CDC

Change Data Capture(简称CDC):其主要作用是用来捕获数据的变更,在数据发生变化之后,把变更后的数据同步到其他异构数据服务中。例如:通过订阅MySQL中binlog日志,将MySQL中发生变化的数据同步到Redis、kafka、Es等存储服务中。

在现实开发中有一些场景会用到CDC工具,例如:

  • 数据库数据发生变化,异步更新缓存

    业务中同时更新DB、cache,需要考虑边缘问题(面试问题:数据库、缓存不一致时如何解决?)

  • 数据存储到MySQL之后,需要同步到ES以便可以使用全文检索

    MySQL对全文搜索支持的不是很好,通常全站搜索都会使用ES来做搜索引擎

  • 大数据分析 Warehouse, 实时同步数据,离线分析等等

> 图片来自 DDIA

如果不使用CDC工具处理数据,需要在业务中耦合数据同步的逻辑。数据存储与数据同步到其他服务这两个操作,往往需要包含在同一个事务中,否则可能会造成一个成功、一个失败,从而导致数据的不一致性。而为了处理这些问题需要写很多的兼容代码,也就导致了非业务操作耦合到业务中,会对项目造成一定的负担。

所以,最好的解决办法就是使用CDC工具,把整个过程进行拆分解耦,最终分为两步:① 数据存储到DB ② 用CDC工具进行同步

目前,常用的CDC工具有:

  • Java技术栈的 alibaba canal项目(大部分公司会使用canal进行定制化)
  • go技术栈的 go-mysql项目。

它们的原理相似:把自己伪装成MySQL的slave节点,从master接收binlog日志,在解析之后把对应数据同步到其他异构数据服务中。

2.认识go-mysql-transfer

go-mysql-transfer 是基于go-mysql项目进行二次封装的一款CDC工具,官方称其为一款MySQL数据库实时增量同步工具。

支持全量数据初始化同步

文档手册:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425
github地址:https://github.com/wj596/go-mysql-transfer

go-mysql项目地址:https://github.com/go-mysql-org/go-mysql

特性:
1.简单,不依赖其它组件,一键部署
2.集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用
3.内置丰富的数据解析、消息生成规则,支持模板语法
4.支持Lua脚本扩展,可处理复杂逻辑,如:数据的转换、清洗、打宽
5.集成Prometheus客户端,支持监控、告警
6.集成Web Admin监控页面
7.支持高可用集群部署
8.数据同步失败重试
9.支持全量数据初始化

同类型工具对比:

实现过程:
1.go-mysql-transfer将自己伪装成MySQL的Slave,
2.向Master发送dump协议获取binlog,解析binlog并生成消息
3.将生成的消息实时、批量发送给接收端

安装、搭建过程可以参考文档。

3.go-mysql-transfer的高可用

在分布式系统中,单节点往往意味着会存在单点故障,如果节点出现问题会导致整个系统不可用。go-mysql-transfer中支持高可用,基于etcd/zookeeper进行选主,即:多个节点同时存在,只有一个节点是master,其他节点为follower在master出现问题之后替补上去,实现秒级故障切换(代码依赖 etcd/zookeeper提供的API接口)。

以 etcd 为例,使用 "go.etcd.io/etcd/clientv3/concurrency" 包中提供的 选主API接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Campaign puts a value as eligible for the election on the prefix
// key.
// Multiple sessions can participate in the election for the
// same prefix, but only one can be the leader at a time.
//
// If the context is 'context.TODO()/context.Background()', the Campaign
// will continue to be blocked for other keys to be deleted, unless server
// returns a non-recoverable error (e.g. ErrCompacted).
// Otherwise, until the context is not cancelled or timed-out, Campaign will
// continue to be blocked until it becomes the leader.
//
// * 类似于zookeeper的临时有序节点,etcd的选举也是在相应的prefix path下面创建key,该key绑定了lease并根据lease id进行命名,
// * key创建后就有revision号,这样使得在prefix path下的key也都是按revision有序
// * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
//
// 选主,如果没有选为 master,则会一直阻塞在该方法上
func (e *Election) Campaign(ctx context.Context, val string) error


// Resign lets a leader start a new election.
//
// 释放 leader,开始一轮新的选主
func (e *Election) Resign(ctx context.Context) (err error)

例如:有三个 go-mysql-transfer 节点基于 etcd 实现高可用,每个节点在启动时都会调用 Campaign() 方法进行选主,其中也仅仅有一个节点会被选为 master,并从该方法返回。其他节点则会阻塞在 Campaign() 方法上,一直等待 master 节点下线,进行下一轮选主,从而自己变为 master 节点并返回。

在成为 master 之后会通过 go-mysql 库与指定 DB 进行数据同步,每当成功进行一次数据同步,会把数据偏移量更新为最新的值。

4.数据同步原理

在数据同步过程中,需要记录已消费数据的偏移量,在go-mysql-transfer中则是通过 Position.pos 来记录已经处理的数据偏移。

1
2
3
4
5
// For binlog filename + position based replication
type Position struct {
Name string
Pos uint32
}

go-mysql-transfer 提供单机版本、高可用版本,其存储 Position 的方式也不一样。

单机版本使用 boltDB 存储到本地数据库文件 data.db 中。
高可用版本,则存储在 etcd/zookeeper 指定的 key 下面。

由于 go-mysql-transfer 依赖 go-mysql 库,go-mysql 库预留了相关的回调方法,在检测到DB数据发行变更之后回调对应方法。

go-mysql 库预留了相关的回调方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type EventHandler interface {
OnRotate(roateEvent *replication.RotateEvent) error
// OnTableChanged is called when the table is created, altered, renamed or dropped.
// You need to clear the associated data like cache with the table.
// It will be called before OnDDL.
OnTableChanged(schema string, table string) error
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}

go-mysql-transfer 在启动时会把对 EventHandler 接口的实现注册到 go-mysql 库的对象 canal.Canal 中。

go-mysql-transfer 支持数据同步到不同的异构服务中,例如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ 中,对于每一种类型都实现同一个 Endpoint 接口。

1
2
3
4
5
6
7
type Endpoint interface {
Connect() error
Ping() error
Consume(mysql.Position, []*model.RowRequest) error
Stock([]*model.RowRequest) int64
Close()
}

在有数据从 canal.Canal 对象到达时,会调用不用类型实现的 Consume 方法把数据同步到对应服务。在同步成功之后,更新偏移量。

5.拓展阅读