欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Springboot2.3.x整合Canal的示例代码,

来源: javaer 分享于  点击 2789 次 点评:11

Springboot2.3.x整合Canal的示例代码,


目录
  • 一、故事背景
  • 二、什么是Canal
  • 三、Canal安装
    • (1)事前准备
      • (1)数据库开启binlog
      • (2)数据库新建账号,开启MySQLslav权限
    • (2)CanalAdmin安装
      • (3)CanalServer安装
        • (4)springbootdemo示例

        一、故事背景

        前言…

        最近工作中遇到了一个数据同步的问题

        我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步…

        那么我自己想到的同步方式呢就两种:

        1、MQ订阅,另一个系统数据变更后将变更数据方式到MQ 我们这边订阅接受

        2、数据库的触发器

        但是呢,两者都被组长paas了!

        1、MQ呢,会造成代码侵入,但是另一个系统暂时不会做任何代码更改…

        2、数据库的触发器会直接跟生产数据库强关联,会抢占资源,甚至有可能造成生产数据库的不稳定…

        对此很是苦恼…

        于是啊,只能借由强大的google、百度,看看能不能解决我这个问题!一番搜索,有学习了一个很有趣的东西…

        Canal

        二、什么是Canal

        canal:阿里开源mysql binlog 数据组件

        官网解释的相当详细了(国产牛逼)…下边我也是照搬过来的…

        官网地址如下:https://github.com/alibaba/canal/wiki

        早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析

        image-20200926161534433

        canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

        工作原理

        • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
        • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
        • canal 解析 binary log 对象(原始为 byte 流)

        canal呢,实际是就是运用了Mysql的主从复制原理…

        MySQL主从复制实现

        image-20200926161859141

        复制遵循三步过程:

        • 主服务器将更改记录到binlog中(这些记录称为binlog事件,可以通过来查看show binary events
        • 从服务器将主服务器的二进制日志事件复制到其中继日志。
        • 中继日志中的从服务器重做事件随后将更新其旧数据。

        如何运作

        image-20200926161913848

        原理很简单:

        • Canal模拟MySQL从站的交互协议,伪装成MySQL从站,然后将转储协议发送到MySQL主服务器。
        • MySQL Master接收到转储请求,并开始将二进制日志推送到slave(即运河)。
        • 运河将二进制日志对象解析为其自己的数据类型(最初为字节流)

        通过官网的介绍,让我们了解到,canal实际上就是伪装为了一个从库,我们只需要订阅到数据变更的主库,那么canal就会以从库的身份读取到其主库的binlog日志!我们拿到canal解析好的binlog日志信息,就等于拿到了变更的数据啦!…

        这样的话呢,我们即保证了不影响其系统数据库正常使用,又不会侵入他的项目代码,一举两得

        ok,接下来开始实战篇…

        三、Canal安装

        (1)事前准备

        (1)数据库开启binlog

        使用canal呢,有一个前提条件,即被订阅的数据库需要开启binlog

        如何查看是否开启binlog呢?

        登录服务器上数据库或在可视化工具中 执行查询语句: 如果出现 log_bin ON 表示已开启Binlog

        show variables like 'log_bin'; 

        image-20200927230024401

        如果服务器上的数据库为自己安装的,则找到配置文件my.conf 添加以下内容,如果买的云实例,则询问厂商开启即可

        image-20200926164518333

        在my.conf文件中的 [mysqld] 下添加以下三行内容

        log-bin=mysql-bin # 开启 binlog
        binlog-format=ROW # 选择 ROW 模式 读行
        server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

        (2)数据库新建账号,开启MySQL slav权限

        canaltest:作为slave 角色的账户 Canal123…:为密码

        CREATE USER canaltest IDENTIFIED BY 'Canal123..';  
        GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%';
        GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ;
        FLUSH PRIVILEGES;

        image-20200926165912070

        连接测试

        image-20200926170124796

        那么到这里,准备工作就好了!

        可能呢,有的小伙伴有点懵,你这是在干啥?那么咱们就来理那么一理! 敲黑板了哈!

        image-20200926170334603

        1、事前准备,是针对于订阅数据库的(即主库)

        2、实际步骤也就两步 1:更改配置,开启binlog 2:设置新账号,赋予slave权限,供canal读取Binlog桥梁使用

        3、以上操作与canal本身没啥关系,仅仅是使用canal的前提条件罢辽…

        (2)Canal Admin 安装

        canal admin 是 一个可视化的 canal web管理运维工程,脱离以往服务器运维,面向web…

        canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

        canal-admin的限定依赖:

        • MySQL,用于存储配置和节点等相关数据
        • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
        • 需要JRE 环境 (安装JDK)

        下载

        wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
        

        解压

        mkdir /usr/local/canal-admin
        tar zxvf canal.admin-1.1.4.tar.gz  -C /usr/local/canal-admin

        进入canal-admin目录下查看

        cd /usr/local/canal-admin

        修改配置

        vim conf/application.yml

        里边的配置 按照自己的实际情况更改…

        server:
          port: 8089
        spring:
          jackson:
            date-format: yyyy-MM-dd HH:mm:ss
            time-zone: GMT+8
        #这里是配置canal-admin 所依赖的数据库,,,存放web管理中设置的配置等,,,
        spring.datasource:
          address: 127.0.0.1:3306
          database: canal_manager
          username: root	
          password: 123456
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
          hikari:
            maximum-pool-size: 30
            minimum-idle: 1
        # 连接所用的账户密码 
        canal:
          adminUser: admin
          adminPasswd: leitest

        导入canaladmin 所需要的数据库文件

        这里需要注意了,要和 application.yml中的数据库名对应,你可以选择命令导入,也可以Navicat 可视化拖sql文件导入…一切…看你喜欢.

        我这个玩canal的服务器呢,是新安装的,mysql直接用docker安装即可,具体可查看我的博客:

        Docker在CentOS7下不能下载镜像timeout的解决办法(图解)

        CentOS 7安装Docker

        需要注意的是,使用docker 安装的mysql 是无法直接使用 mysql -uroot -p 命令的哦,需要先将脚本复制到容器中,docker不熟练或觉得麻烦的同鞋,请直接使用Navicat可视化工具…

        导入canal-admin服务所必需的sql文件

        如果是服务器软件软件安装的mysql 则直接执行以下命令即可

        mysql -uroot -p
        #.........
        # 导入初始化SQL
        > source conf/canal_manager.sql

        image-20200926215220008

        启动

        直接执行启动脚本即可

        cd bin
        ./startup.sh

        image-20200926220024950

        默认账户密码:

        admin:123456

        image-20200926220129110

        (3)Canal Server 安装

        canal-server 才是canal的核心我们前边所讲的canal的功能,实际上讲述的就是canal-server的功能…admin 仅仅只是一个web管理而已,不要搞混主次关系…

        下载

        wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

        解压

        mkdir /usr/local/canal-server
        tar zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/canal-server

        启动,并连接到canal-admin web端

        首先,我们需要修改配置文件

        cd /usr/local/canal-server
        
        vim /conf/canal_local.properties

        image-20200926221410324

        image-20200926221344585

        注意了,密码如何加密!!!

        要记得,前边 canal-admin 的 aplication.yml 中设置了账户密码为 admin:leitest

        # 连接所用的账户密码 
        canal:
          adminUser: admin
          adminPasswd: leitest

        所以,我们这里需要对明文 leitest 加密并替换即可

        使用数据库函数 PASSWORD 加密即可

        SELECT PASSWORD(‘要加密的明文’),然后去掉前边的* 号就行

        image-20200926221859106

        启动并连接到admin

        sh bin/startup.sh local

        查看端口看是否有 11110 、11111、11112

        netstat -untlp 看了一下,发现没有,说明server 没有启动成功

        image-20200926222317087

        看下日志

        vim logs/canal/canal.log

        image-20200926222442734

        解决办法:

        1、canal-admin 先停止后从起

        2、canal server 先以之前的形式运行,不输入后边 local 命令

        3、关闭canal server

        4、再以canal server 连接 admin 形式启动

        image-20200926230725865

        admin页面上新建server

        image-20200926230834367

        修改配置,注释 (instance连接信息,我们还是以前边设置的 admin:leitest 为准,所有这里需要注释掉,如果不注释,那么我们代码中连接则需要使用此账号以及密码)

        image-20200926234156162

        接下来咱们创建instance

        如何理解server 和instance 呢,我认为,可以把它当做 java 中的 class 和 bean 即 类和对象

        server 为类 instance 为其具体的实例对象 ,可创建多个不同的实例…

        而我们这边监听到主库变化的呢,则是根据业务,对不同的实例即(instance )做不同配置即可…

        image-20200926231123562

        image-20200926231135593

        image-20200926231429803

        image-20200926231516786

        根据自己情况进行过滤数据

        canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal\.test15. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)  
        canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名true 
        canal.instance.filter.query.dcl是否忽略dcl语句false 
        canal.instance.filter.query.dml是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)false 
        canal.instance.filter.query.ddl是否忽略ddl语句false 

        更多设置请见官网:https://github.com/alibaba/canal/wiki/AdminGuide

        如此一来,一个简单的canal环境就搭建好了,接下来,咱们开始测试吧!

        (4)springboot demo示例

        引入canal所需依赖

        <dependency>
                    <groupId>com.alibaba.otter</groupId>
                    <artifactId>canal.client</artifactId>
                    <version>1.1.4</version>
                </dependency>

        配置

        canal:
          # instance 实例所在ip
          host: 192.168.96.129
          # tcp通信端口
          port: 11111
          # 账号  canal-admin application.yml 设置的
          username: admin
          # 密码
          password: leitest
          #实例名称
          instance: test

        代码

        package com.leilei;
        import com.alibaba.otter.canal.client.CanalConnector;
        import com.alibaba.otter.canal.client.CanalConnectors;
        import com.alibaba.otter.canal.protocol.CanalEntry;
        import com.alibaba.otter.canal.protocol.Message;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.boot.ApplicationArguments;
        import org.springframework.boot.ApplicationRunner;
        import org.springframework.stereotype.Component;
        import java.net.InetSocketAddress;
        import java.util.List;
        /**
         * @author lei
         * @version 1.0
         * @date 2020/9/27 22:23
         * @desc 读取binlog日志
         */
        @Component
        public class ReadBinLogService implements ApplicationRunner {
            @Value("${canal.host}")
            private String host;
            @Value("${canal.port}")
            private int port;
            @Value("${canal.username}")
            private String username;
            @Value("${canal.password}")
            private String password;
            @Value("${canal.instance}")
            private String instance;
            @Override
            public void run(ApplicationArguments args) throws Exception {
                CanalConnector conn = getConn();
                while (true) {
                    conn.connect();
                    //订阅实例中所有的数据库和表
                    conn.subscribe(".*\\..*");
                    // 回滚到未进行ack的地方
                    conn.rollback();
                    // 获取数据 每次获取一百条改变数据
                    Message message = conn.getWithoutAck(100);
                    long id = message.getId();
                    int size = message.getEntries().size();
                    if (id != -1 && size > 0) {
                        // 数据解析
                        analysis(message.getEntries());
                    }else {
                        Thread.sleep(1000);
                    }
                    // 确认消息
                    conn.ack(message.getId());
                    // 关闭连接
                    conn.disconnect();
                }
            }
            /**
             * 数据解析
             */
            private void analysis(List<CanalEntry.Entry> entries) {
                for (CanalEntry.Entry entry : entries) {
                    // 只解析mysql事务的操作,其他的不解析
                    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                        continue;
                    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    // 解析binlog
                    CanalEntry.RowChange rowChange = null;
                    try {
                        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    } catch (Exception e) {
                        throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
                    if (rowChange != null) {
                        // 获取操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 获取当前操作所属的数据库
                        String dbName = entry.getHeader().getSchemaName();
                        // 获取当前操作所属的表
                        String tableName = entry.getHeader().getTableName();
                        // 事务提交时间
                        long timestamp = entry.getHeader().getExecuteTime();
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);
                            System.out.println("-------------------------------------------------------------");
                        }
             * 解析具体一条Binlog消息的数据
             *
             * @param dbName    当前操作所属数据库名称
             * @param tableName 当前操作所属表名称
             * @param eventType 当前操作类型(新增、修改、删除)
            private static void dataDetails(List<CanalEntry.Column> beforeColumns,
                                            List<CanalEntry.Column> afterColumns,
                                            String dbName,
                                            String tableName,
                                            CanalEntry.EventType eventType,
                                            long timestamp) {
                System.out.println("数据库:" + dbName);
                System.out.println("表名:" + tableName);
                System.out.println("操作类型:" + eventType);
                if (CanalEntry.EventType.INSERT.equals(eventType)) {
                    System.out.println("新增数据:");
                    printColumn(afterColumns);
                } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                    System.out.println("删除数据:");
                    printColumn(beforeColumns);
                } else {
                    System.out.println("更新数据:更新前数据--");
                    System.out.println("更新数据:更新后数据--");
                System.out.println("操作时间:" + timestamp);
            private static void printColumn(List<CanalEntry.Column> columns) {
                for (CanalEntry.Column column : columns) {
                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
             * 获取连接
            public CanalConnector getConn() {
                return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
        }

        测试查看

        数据库修改数据库时

        image-20200927222635688

        数据新增数据时

        image-20200927222734833

        删除数据(把我们才添加的小明删掉)

        image-20200927222834002

        当我们操作监控的数据库DM L操作的时候呢,会被canal监听到…我们呢,通过canal监听,拿到修改的库,修改的表,修改的字段,便可以根据自己业务进行数据处理了!

        哎,这个时候啊,可能有小伙伴就要问了,那么,我能不能直接获取其操作的sql语句呢?

        目前,我是自己解析其列来手动拼接的sql语句实现了

        话不多说,先上效果:

        canal 监听到主库sql变化----> update students set  id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
        canal 监听到主库sql变化----> delete from students where id=6
        canal 监听到主库sql变化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','测试新增','深圳','2020-09-27 22:46:53','')
        canal 监听到主库sql变化----> update students set  id = '89', age = '98', name = '测试新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89

        image-20200927224716304

        实际上呢,我们也就是拿到其执行前列数据变化 执行后列数据变化,自己拼接了一个sql罢了…附上代码

        package com.leilei;
        import com.alibaba.otter.canal.client.CanalConnector;
        import com.alibaba.otter.canal.client.CanalConnectors;
        import com.alibaba.otter.canal.protocol.CanalEntry.*;
        import com.alibaba.otter.canal.protocol.Message;
        import com.alibaba.otter.canal.protocol.exception.CanalClientException;
        import com.google.protobuf.InvalidProtocolBufferException;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.boot.ApplicationArguments;
        import org.springframework.boot.ApplicationRunner;
        import org.springframework.stereotype.Component;
        import java.net.InetSocketAddress;
        import java.util.List;
        import java.util.Queue;
        import java.util.concurrent.ConcurrentLinkedQueue;
        /**
         * @author lei
         * @version 1.0
         * @date 2020/9/27 22:33
         * @desc 读取binlog日志
         */
        @Component
        public class ReadBinLogToSql implements ApplicationRunner {
            //读取的binlog sql 队列缓存 一边Push 一边poll
            private Queue<String> canalQueue = new ConcurrentLinkedQueue<>();
            @Value("${canal.host}")
            private String host;
            @Value("${canal.port}")
            private int port;
            @Value("${canal.username}")
            private String username;
            @Value("${canal.password}")
            private String password;
            @Value("${canal.instance}")
            private String instance;
            @Override
            public void run(ApplicationArguments args) throws Exception {
                CanalConnector conn = getConn();
                while (true) {
                    try {
                        conn.connect();
                        //订阅实例中所有的数据库和表
                        conn.subscribe(".*\\..*");
                        // 回滚到未进行ack的地方
                        conn.rollback();
                        // 获取数据 每次获取一百条改变数据
                        Message message = conn.getWithoutAck(100);
                        long id = message.getId();
                        int size = message.getEntries().size();
                        if (id != -1 && size > 0) {
                            // 数据解析
                            analysis(message.getEntries());
                        } else {
                            Thread.sleep(1000);
                        }
                        // 确认消息
                        conn.ack(message.getId());
                    } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 关闭连接
                        conn.disconnect();
                    }
                }
            }
            private void analysis(List<Entry> entries) throws InvalidProtocolBufferException {
                for (Entry entry : entries) {
                    if (EntryType.ROWDATA == entry.getEntryType()) {
                        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                        EventType eventType = rowChange.getEventType();
                        if (eventType == EventType.DELETE) {
                            saveDeleteSql(entry);
                        } else if (eventType == EventType.UPDATE) {
                            saveUpdateSql(entry);
                        } else if (eventType == EventType.INSERT) {
                            saveInsertSql(entry);
                        }
                    }
                }
            }
            /**
             * 保存更新语句
             *
             * @param entry
             */
            private void saveUpdateSql(Entry entry) {
                try {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    List<RowData> dataList = rowChange.getRowDatasList();
                    for (RowData rowData : dataList) {
                        List<Column> afterColumnsList = rowData.getAfterColumnsList();
                        StringBuffer sql = new StringBuffer("update " +
                                entry.getHeader().getTableName() + " set ");
                        for (int i = 0; i < afterColumnsList.size(); i++) {
                            sql.append(" ")
                                    .append(afterColumnsList.get(i).getName())
                                    .append(" = '").append(afterColumnsList.get(i).getValue())
                                    .append("'");
                            if (i != afterColumnsList.size() - 1) {
                                sql.append(",");
                            }
                        }
                        sql.append(" where ");
                        List<Column> oldColumnList = rowData.getBeforeColumnsList();
                        for (Column column : oldColumnList) {
                            if (column.getIsKey()) {
                                sql.append(column.getName()).append("=").append(column.getValue());
                                break;
                            }
                        }
                        canalQueue.add(sql.toString());
                    }
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            }
            /**
             * 保存删除语句
             *
             * @param entry
             */
            private void saveDeleteSql(Entry entry) {
                try {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    List<RowData> rowDatasList = rowChange.getRowDatasList();
                    for (RowData rowData : rowDatasList) {
                        List<Column> columnList = rowData.getBeforeColumnsList();
                        StringBuffer sql = new StringBuffer("delete from " +
                                entry.getHeader().getTableName() + " where ");
                        for (Column column : columnList) {
                            if (column.getIsKey()) {
                                sql.append(column.getName()).append("=").append(column.getValue());
                                break;
                            }
                        }
                        canalQueue.add(sql.toString());
                    }
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            }
            /**
             * 保存插入语句
             *
             * @param entry
             */
            private void saveInsertSql(Entry entry) {
                try {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    List<RowData> datasList = rowChange.getRowDatasList();
                    for (RowData rowData : datasList) {
                        List<Column> columnList = rowData.getAfterColumnsList();
                        StringBuffer sql = new StringBuffer("insert into " +
                                entry.getHeader().getTableName() + " (");
                        for (int i = 0; i < columnList.size(); i++) {
                            sql.append(columnList.get(i).getName());
                            if (i != columnList.size() - 1) {
                                sql.append(",");
                            }
                        }
                        sql.append(") VALUES (");
                        for (int i = 0; i < columnList.size(); i++) {
                            sql.append("'" + columnList.get(i).getValue() + "'");
                            if (i != columnList.size() - 1) {
                                sql.append(",");
                            }
                        }
                        sql.append(")");
                        canalQueue.add(sql.toString());
                    }
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            }
            /**
             * 获取连接
             */
            public CanalConnector getConn() {
                return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
            }
            /**
             * 模拟消费canal转换的sql语句
             */
            public void executeQueueSql() {
                int size = canalQueue.size();
                for (int i = 0; i < size; i++) {
                    String sql = canalQueue.poll();
                    System.out.println("canal 监听到主库sql变化----> " + sql);
                }
            }
        }

        当然了,这只是简单的demo 演示,您可根据自己的业务进行修改完善即可…

        上边的安装步骤呢,我也是不断的测试过,没有问题,当然可能或多或少有些坑没有踩到,但是如果您按照我的步骤来,大概率是一马平川的…

        附上项目源码:springboot-canal

        到此这篇关于Springboot2.3.x整合Canal的文章就介绍到这了,更多相关Springboot2.3.x整合Canal内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!

        您可能感兴趣的文章:
        • springboot 整合canal实现示例解析
        相关栏目:

        用户点评