欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

如何利用Java实现MySQL的数据变化监听,

来源: javaer 分享于  点击 27032 次 点评:27

如何利用Java实现MySQL的数据变化监听,


目录
  • 1.binlog 简介
    • 1.1 什么是 binlog
    • 1.2 binlog 的三种格式
  • 2. 开启 binlog 并配置 MySQL
    • 2.1 检查 binlog 是否开启
    • 2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
    • 2.3 验证 binlog 配置
  • 3. 使用 Java 监听 binlog
    • 3.1 选择工具:Canal
    • 3.2 Java 代码监听 binlog
  • 4. 代码解析
    • 总结

      在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。

      1.binlog 简介

      1.1 什么是 binlog

      binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATEALTER)和 DML(数据操作语言,如 INSERTUPDATEDELETE)的日志文件,它用于:

      • 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
      • 数据恢复:通过 mysqlbinlog 工具解析 binlog 恢复数据。
      • 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。

      1.2 binlog 的三种格式

      binlog 格式说明
      STATEMENT记录 SQL 语句本身
      ROW记录行数据变更(推荐)
      MIXED结合前两者,MySQL 自动判断

      由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。

      2. 开启 binlog 并配置 MySQL

      2.1 检查 binlog 是否开启

      SHOW VARIABLES LIKE 'log_bin';
      

      如果 log_bin 值为 OFF,说明 binlog 未开启。

      2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

      [mysqld] 部分添加以下内容:

      server-id=1
      log-bin=mysql-bin
      binlog-format=ROW
      binlog-row-image=FULL
      expire_logs_days=7
      

      重启 MySQL:

      systemctl restart mysql  # Linux
      net stop mysql && net start mysql  # Windows
      

      2.3 验证 binlog 配置

      执行:

      SHOW BINARY LOGS;
      

      如果有 binlog 文件,如 mysql-bin.000001,说明已开启。

      3. 使用 Java 监听 binlog

      3.1 选择工具:Canal

      阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。

      3.2 Java 代码监听 binlog

      引入 Maven 依赖

      <dependencies>
          <dependency>
              <groupId>com.alibaba.otter</groupId>
              <artifactId>canal.client</artifactId>
              <version>1.1.6</version>
          </dependency>
      </dependencies>

      编写 Java 代码

      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry;
      import com.alibaba.otter.canal.protocol.Message;
      
      import java.net.InetSocketAddress;
      import java.util.List;
      
      public class BinlogListener {
          public static void main(String[] args) {
              // 连接 Canal
              CanalConnector connector = CanalConnectors.newSingleConnector(
                      new InetSocketAddress("127.0.0.1", 11111), 
                      "example", "canal", "canal");
              
      
              try {
                  connector.connect();
                  connector.subscribe(".*\\..*"); // 监听所有库表
                  connector.rollback();
          
                  while (true) {
                      Message message = connector.getWithoutAck(100); // 获取数据
                      long batchId = message.getId();
                      List<CanalEntry.Entry> entries = message.getEntries();
          
                      if (batchId != -1 && !entries.isEmpty()) {
                          for (CanalEntry.Entry entry : entries) {
                              if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                                  processEntry(entry);
                              }
                          }
                      }
                      connector.ack(batchId); // 确认消息
                  }
              } finally {
                  connector.disconnect();
              }
          }
          
          private static void processEntry(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  CanalEntry.EventType eventType = rowChange.getEventType();
          
                  System.out.println("变更表:" + entry.getHeader().getTableName());
                  System.out.println("变更类型:" + eventType);
          
                  for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                      if (eventType == CanalEntry.EventType.DELETE) {
                          System.out.println("删除数据:" + rowData.getBeforeColumnsList());
                      } else if (eventType == CanalEntry.EventType.INSERT) {
                          System.out.println("新增数据:" + rowData.getAfterColumnsList());
                      } else {
                          System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
                          System.out.println("更新后数据:" + rowData.getAfterColumnsList());
                      }
                  }
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
      }
      

      4. 代码解析

      1.创建 Canal 连接

      CanalConnector connector = CanalConnectors.newSingleConnector(
          new InetSocketAddress("127.0.0.1", 11111), 
          "example", "canal", "canal");
      
      • 127.0.0.1:Canal 服务器地址
      • 11111:Canal 端口
      • example:Canal 实例
      • canal/canal:默认账号密码

      2.获取 binlog 变更数据

      Message message = connector.getWithoutAck(100);
      

      getWithoutAck(100):拉取 100 条 binlog 事件。

      3.解析 binlog

      for (CanalEntry.Entry entry : entries) {
          if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
              processEntry(entry);
          }
      }
      

      仅处理 ROWDATA 类型的变更,忽略事务等其他信息。

      4.分类处理 INSERT、UPDATE、DELETE

      if (eventType == CanalEntry.EventType.DELETE) {
          System.out.println("删除数据:" + rowData.getBeforeColumnsList());
      } else if (eventType == CanalEntry.EventType.INSERT) {
          System.out.println("新增数据:" + rowData.getAfterColumnsList());
      } else {
          System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
          System.out.println("更新后数据:" + rowData.getAfterColumnsList());
      }
      

      总结

      • MySQL binlog 记录数据库变更,可用于监听增量数据。
      • Canal 作为 MySQL 从库解析 binlog,实现数据同步。
      • Java 代码示例 展示如何用 Canal 监听 INSERTUPDATEDELETE 操作,并解析变更数据。

      这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

      到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!

      您可能感兴趣的文章:
      • MySQL数据变化监听的实现方案
      • 使用Canal监听MySQL Binlog日志的实现方案
      • 如何通过Java监听MySQL数据的变化
      • Canal监听MySQL的实现步骤
      相关栏目:

      用户点评