阿里云的Canal框架实现Redis与Mysql同步原理及代码示例

时间:2021-6-4 作者:qvyue

上一篇 Redis与MySQL的数据同步解决方案
下一篇 >>>阿里云的Canal框架配置


git地址:https://github.com/alibaba/canal
Canal目前支持三种监听通讯模式 TCP、KAFKA/Rocketmq

核心原理

a.canal会启动一个Server端作为一个mysql从节点拉取mysql主的节点最新的bin文件
b.只要mysql主节点的bin文件发送变化都会增量的形式通知给我们的canalServer端
c.canalServer在通知给canalServerClient,canalServerClient自己手动配置刷新Redis、kafka等各种客户端。

阿里云的Canal框架实现Redis与Mysql同步原理及代码示例

Canal通过TCP实现同步

Mysql配置:
server_id=177  ###服务器id
log-bin=mysql-bin   ###开启日志文件
binlog-format=ROW #选择row模式

Canal服务端配置: canal.properties
#通讯端口
canal.port = 11111
#目标支持:client、rocketmq、kafka

#可以使用单独的账号信息
drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal'; 
flush privileges;

主数据源配置:example/ instance.properties
canal.instance.master.address=10.211.55.26:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root

public class CanalClient {

    public static void main(String args[]) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.211.55.16",
                11111), "example", "", "");
        int batchSize = 100;
        try {
            connector.connect();
            connector.subscribe("test.*");
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    printEntry(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

    private static void redisInsert(List columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
        }
    }

    private static void redisUpdate(List columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
        }
    }

    private static void redisDelete(List columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.delKey(columns.get(0).getValue());
        }
    }
}

结果打印:
================> binlog[mysql-bin.000015:457] , name[test,my_tb] , eventType : CREATE
================> binlog[mysql-bin.000015:324] , name[test,ttt] , eventType : INSERT
================> binlog[mysql-bin.000015:757] , name[test,ttt] , eventType : UPDATE
================> binlog[mysql-bin.000015:1025] , name[test,ttt] , eventType : DELETE

Canal通过kafka实现同步

配置信息修改
1.修改 example/instance.properties 
canal.mq.topic=maikt-topic
2.修改 canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092


org.springframework.kafkaspring-kafkaorg.springframework.bootspring-boot-starter-data-redis

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka2
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
  redis:
    host: 47.96.105.42
#    password:
    port: 6369
    database: 10

@KafkaListener(topics = "jarye-topic")
public void receive(ConsumerRecord, ?> consumer) {
    System.out.println("topic名称:" + consumer.topic() + ",key:" +
            consumer.key() + "," +
            "分区位置:" + consumer.partition()
            + ", 下标" + consumer.offset()+","+consumer.value());
}
声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:qvyue@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。