上一篇 Redis与MySQL的数据同步解决方案
下一篇 >>>阿里云的Canal框架配置
git地址:https://github.com/alibaba/canal
Canal目前支持三种监听通讯模式 TCP、KAFKA/Rocketmq
核心原理
a.canal会启动一个Server端作为一个mysql从节点拉取mysql主的节点最新的bin文件
b.只要mysql主节点的bin文件发送变化都会增量的形式通知给我们的canalServer端
c.canalServer在通知给canalServerClient,canalServerClient自己手动配置刷新Redis、kafka等各种客户端。

Canal通过TCP实现同步
Mysql配置:
server_id=177 ###服务器id
log-bin=mysql-bin ###开启日志文件
binlog-format=ROW #选择row模式
Canal服务端配置: canal.properties
#通讯端口
canal.port = 11111
#目标支持:client、rocketmq、kafka
#可以使用单独的账号信息
drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
主数据源配置:example/ instance.properties
canal.instance.master.address=10.211.55.26:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root
public class CanalClient {
public static void main(String args[]) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.211.55.16",
11111), "example", "", "");
int batchSize = 100;
try {
connector.connect();
connector.subscribe("test.*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
// 提交确认
connector.ack(batchId);
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisUpdate(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisDelete(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey(columns.get(0).getValue());
}
}
}
结果打印:
================> binlog[mysql-bin.000015:457] , name[test,my_tb] , eventType : CREATE
================> binlog[mysql-bin.000015:324] , name[test,ttt] , eventType : INSERT
================> binlog[mysql-bin.000015:757] , name[test,ttt] , eventType : UPDATE
================> binlog[mysql-bin.000015:1025] , name[test,ttt] , eventType : DELETE
Canal通过kafka实现同步
配置信息修改
1.修改 example/instance.properties
canal.mq.topic=maikt-topic
2.修改 canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
org.springframework.kafkaspring-kafkaorg.springframework.bootspring-boot-starter-data-redis
# kafka
spring:
kafka:
# kafka服务器地址(可以多个)
bootstrap-servers: 127.0.0.1:9092
consumer:
# 指定一个默认的组名
group-id: kafka2
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
redis:
host: 47.96.105.42
# password:
port: 6369
database: 10
@KafkaListener(topics = "jarye-topic")
public void receive(ConsumerRecord, ?> consumer) {
System.out.println("topic名称:" + consumer.topic() + ",key:" +
consumer.key() + "," +
"分区位置:" + consumer.partition()
+ ", 下标" + consumer.offset()+","+consumer.value());
}