Redis与Mysql | Master与Slave同步:canal教学

InterviewCoder

# 前言:

​ 作者最近在做自己的项目,使用到 Redis,需要热更新,修改 Mysql 后同步 Redis 缓存,出于对圈子的贡献,也较于当前的 canal 的博客大多数不是很详细,所以写下这篇文章,时间是 2022 年 6 月 29 日。目的是帮助更多的人,希望能为在祖国的经济发展作出小小的贡献。

​ end

# 学习 Canal 基本需要:

​ Linux 服务器,性能无大要求

​ Java 基础

​ Mysql,Redis 基础

# 俗话说,要了解一个东西,先了解他的由来:

# 一、Canal 起源

​ 阿里巴巴因为业务特性,买家集中在国外,衍生出了杭州美国异地数据同步需求,从 2010 年开始,阿里巴巴开始开发 canal,canal 是基于 Java 开发的数据库增量日志解析,提供增量数据订阅 & 消费的中间件。Canal 主要支持了 Mysql 和 Bilog 解析,解析完成后利用 canal Client 来处理获取相关数据。

了解完 canal 的起源,再来看看 canal 的核心业务依赖,也就是 mysql 的二进制日志:binary_log 简称:Binlog

# 二、Binlog

​ binlog 指二进制日志,它记录了数据库上的所有改变,并以二进制的形式保存在磁盘中,它可以用来查看数据库的变更历史、数据库增量备份和恢复、MySQL 的复制(主从数据库的复制)。

# binlog 有三种格式:

statement:基于 SQL 语句的复制(statement-based replication,SBR)
row:基于行的复制(row-based replication,RBR)
mixed:混合模式复制(mixed-based replication,MBR)

# statement:语句级别

每一条会修改数据的 sql 都会记录在 binlog 中。

​ 优点:不需要记录每一行的变化,减少了 binlog 日志量,节约了 IO,提高性能。但是注意 statement 相比于 row 能节约多少性能与日志量,取决于应用的 SQL 情况。正常同一条记录修改或者插入 row 格式所产生的日志量还小于 Statement 产生的日志量,但是考虑到如果带条件的 update 操作,以及整表删除,alter 表等操作,ROW 格式会产生大量日志,因此在考虑是否使用 ROW 格式日志时应该跟据应用的实际情况,其所产生的日志量会增加多少,以及带来的 IO 性能问题。

​ 缺点:由于记录的只是执行语句,为了这些语句在 slave 上正确运行,我们还必须记录每条语句在执行时候的一些相关信息,以保证所有语句能在 slave 得到和在 master 端执行时相同的结果。另外,一些特定的函数功能如果要在 slave 和 master 上保持一致会有很多相关问题。

# row:行数据级别

5.1.5 版本的 MySQL 才开始支持 row level 的复制,它不记录 sql 语句上下文相关信息,仅保存哪条记录被修改。

​ 优点:binlog 中可以不记录执行的 sql 语句的上下文相关的信息,仅需要记录那一条记录被修改成什么了。所以 row level 的日志会非常清楚的记下每一行数据修改的细节。而且不会出现某些特定情况下的存储过程,或 function,以及 trigger 的调用和触发无法被正确复制的问题。

​ 缺点:所有的执行的语句当记录到日志中的时候,都将以每行记录的修改来记录,这样可能会产生大量的日志内容。但是新版本的 MySQL 对 row level 模式进行了优化,并不是所有的修改都会以 row level 来记录,像遇到表结构变更的时候就会以 statement 模式来记录,如果 sql 语句确实就是 update 或者 delete 等修改数据的语句,那么还是会记录所有行的变更。

# mixed:混合级别

从 5.1.8 版本开始,MySQL 提供了 Mixed 格式,实际上就是 Statement 与 Row 的结合。

​ 在 Mixed 模式下,一般的语句修改使用 statment 格式保存 binlog,如果一些函数,statement 无法完成主从复制的操作,则采用 row 格式保存 binlog,MySQL 会根据执行的每一条具体的 sql 语句来区分对待记录的日志形式,也就是在 Statement 和 Row 之间选择一种。

# 由于 statement 和 mixed 的特殊性,通过 sql 来备份,总会有数据不一致的情况,比如:now () 函数。
# 所以绝大多数场景下使用 Row 级别,也就是行级别,这样保证我们备份的数据和出口的数据相一致。

# 三、下载和安装 Canal 工具

下载前,在 mysql 创建 canal 用户,因为 canal 服务端需要连接 mysql 数据库

1
2
3
4
5
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

# 改了配置文件之后,重启 MySQL,使用命令查看是否打开 binlog 模式:

在这里插入图片描述

# 查看 binlog 日志文件列表:

在这里插入图片描述

# 点此下载 Canal👇

https://ghproxy.com/https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz

此链接为 github 代理提供连接,仅供参考,此处无广告意义。

image-20220629152616754

下载好后上传至 linux 服务器,创建 canal 文件夹并解压到 canal 文件夹中

image-20220629153150741

完成后会得到以上四个核心文件:bin,conf,lib,logs

需要修改一处配置文件:

​ /canal/conf/example 下的 instance.properties

image-20220629153511220

修改完成后保存退出

接下来进入 bin 目录 sh startUp.sh 启动 canal 服务端

# 至此服务端的操作基本完成

Java 客户端操作
首先引入 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>

然后创建一个 canal 项目,使用 SpringBoot 构建,如图所示,创建 canal 包:

image-20220629153956493

canal 工具类,仅供参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package cn.brath.canal;
import java.awt.print.Printable;
import java.time.LocalDateTime;

import cn.brath.common.redis.service.TokenService;
import cn.brath.common.redis.util.RedisKeys;
import cn.brath.common.utils.AssertUtil;
import cn.brath.common.utils.UserTokenManager;
import cn.brath.entity.IvUser;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.print.attribute.standard.MediaPrintableArea;
import java.net.InetSocketAddress;
import java.time.ZoneId;
import java.util.List;

@Component
@Data
public class CanalClient {

/**
* SLF4J日志
*/
private static Logger logger = LoggerFactory.getLogger(CanalClient.class);

private String host = "***.***.***.***";

private String port = "11111";

private String destination = "example";

/**
* 用户令牌业务接口
*/
private static TokenService tokenService;

@Autowired
public void TokenServiceIn(TokenService tokenService) {
CanalClient.tokenService = tokenService;
}

/**
* canal启动方法
*/
public void run() {
if (!AssertUtil.isEmptys(host, port, destination)) {
logger.error("canal客户端连接失败,当前服务端host:{},port:{},destination:{}", host, port, destination);
return;
}
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(host, Integer.valueOf(port)), destination, "", ""
);
int batchSize = 1000;
try {
//建立连接
connector.connect();
//目标为全部表
connector.subscribe(".*\\..*");
connector.rollback();
logger.info("canal客户端连接完成,当前服务端host:{},port:{},destination:{}", host, port, destination);
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
logger.info("同步任务进行中,检测到修改数据,执行同步Redis");
dataHandle(message.getEntries());
}
connector.ack(batchId);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}

/**
* 数据处理
*
* @param entrys
*/
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
JSONObject beforeData = null;
JSONObject afterData = null;
for (Entry entry : entrys) {
if (EntryType.ROWDATA.equals(entry.getEntryType())) {
//反序列化rowdata
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
//获取数据集
List<RowData> rowDataList = rowChange.getRowDatasList();
//获取数据遍历
for (RowData rowData : rowDataList) {
afterData = new JSONObject();
List<Column> afterColumnsList = rowData.getAfterColumnsList();
for (Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
}

//因为作者这里只做同步Redis,不考虑到操作类型,只需要覆盖相同键值数据

//写入Redis
executeRedisWarehousing(afterData);
}
}
}

/**
* 执行Redis用户数据入库
*
* @param afterData
*/
public static void executeRedisWarehousing(JSONObject afterData) {
logger.info("开始执行Redis热更新入库同步Mysql -- ");

do...

logger.info("入库完成");
}

}

# 启动类使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@Slf4j
public class Application {
public static void main(String[] args) {
SpringApplication.run(InterviewUserServiceApplication.class, args);
//项目启动,执行canal客户端监听
try {
new CanalClient().run();
} catch (Exception e) {
e.printStackTrace();
log.error(" canal客户端监听 启动失败,原因可能是:{}", e.getMessage());
}
}
}

接下来启动项目运行,成功连接 canal 后我们尝试修改一个 mysql 的数据,发现在客户端成功完成了与 Redis 的同步操作

image-20220629154454409

# 相关异常:

Canal 异常:

dump address /124.222.106.122:3306 has an error, retrying. caused by java.la

解决办法:重启 Mysql,删除 example 下的 dat 后缀文件后重启 canal

其他:

​ 是否开放端口 11111

​ mysql 是否连接成功,查看 logs/example/example.log

​ 服务端与客户端是否连接成功,查看当前项目日志即可

# 关于我

Brath 是一个热爱技术的 Java 程序猿,公众号「InterviewCoder」定期分享有趣有料的精品原创文章!

InterviewCoder

非常感谢各位人才能看到这里,原创不易,文章如果有帮助可以关注、点赞、分享或评论,这都是对我的莫大支持!

评论