如何利用Java实现MySQL的数据变化监听,
如何利用Java实现MySQL的数据变化监听,
目录
- 1.binlog 简介
- 1.1 什么是 binlog
- 1.2 binlog 的三种格式
- 2. 开启 binlog 并配置 MySQL
- 2.1 检查 binlog 是否开启
- 2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
- 2.3 验证 binlog 配置
- 3. 使用 Java 监听 binlog
- 3.1 选择工具:Canal
- 3.2 Java 代码监听 binlog
- 4. 代码解析
- 总结
在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。
1.binlog 简介
1.1 什么是 binlog
binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATE
、ALTER
)和 DML(数据操作语言,如 INSERT
、UPDATE
、DELETE
)的日志文件,它用于:
- 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
- 数据恢复:通过
mysqlbinlog
工具解析 binlog 恢复数据。 - 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。
1.2 binlog 的三种格式
binlog 格式 | 说明 |
---|---|
STATEMENT | 记录 SQL 语句本身 |
ROW | 记录行数据变更(推荐) |
MIXED | 结合前两者,MySQL 自动判断 |
由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。
2. 开启 binlog 并配置 MySQL
2.1 检查 binlog 是否开启
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin
值为 OFF
,说明 binlog 未开启。
2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
在 [mysqld]
部分添加以下内容:
server-id=1 log-bin=mysql-bin binlog-format=ROW binlog-row-image=FULL expire_logs_days=7
重启 MySQL:
systemctl restart mysql # Linux net stop mysql && net start mysql # Windows
2.3 验证 binlog 配置
执行:
SHOW BINARY LOGS;
如果有 binlog 文件,如 mysql-bin.000001
,说明已开启。
3. 使用 Java 监听 binlog
3.1 选择工具:Canal
阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。
3.2 Java 代码监听 binlog
引入 Maven 依赖
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version> </dependency> </dependencies>
编写 Java 代码
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class BinlogListener { public static void main(String[] args) { // 连接 Canal CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal"); try { connector.connect(); connector.subscribe(".*\\..*"); // 监听所有库表 connector.rollback(); while (true) { Message message = connector.getWithoutAck(100); // 获取数据 long batchId = message.getId(); List<CanalEntry.Entry> entries = message.getEntries(); if (batchId != -1 && !entries.isEmpty()) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { processEntry(entry); } } } connector.ack(batchId); // 确认消息 } } finally { connector.disconnect(); } } private static void processEntry(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println("变更表:" + entry.getHeader().getTableName()); System.out.println("变更类型:" + eventType); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { System.out.println("删除数据:" + rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { System.out.println("新增数据:" + rowData.getAfterColumnsList()); } else { System.out.println("更新前数据:" + rowData.getBeforeColumnsList()); System.out.println("更新后数据:" + rowData.getAfterColumnsList()); } } } catch (Exception e) { e.printStackTrace(); } } }
4. 代码解析
1.创建 Canal 连接
CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");
127.0.0.1
:Canal 服务器地址11111
:Canal 端口example
:Canal 实例canal/canal
:默认账号密码
2.获取 binlog 变更数据
Message message = connector.getWithoutAck(100);
getWithoutAck(100)
:拉取 100 条 binlog 事件。
3.解析 binlog
for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { processEntry(entry); } }
仅处理 ROWDATA
类型的变更,忽略事务等其他信息。
4.分类处理 INSERT、UPDATE、DELETE
if (eventType == CanalEntry.EventType.DELETE) { System.out.println("删除数据:" + rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { System.out.println("新增数据:" + rowData.getAfterColumnsList()); } else { System.out.println("更新前数据:" + rowData.getBeforeColumnsList()); System.out.println("更新后数据:" + rowData.getAfterColumnsList()); }
总结
- MySQL binlog 记录数据库变更,可用于监听增量数据。
- Canal 作为 MySQL 从库解析 binlog,实现数据同步。
- Java 代码示例 展示如何用 Canal 监听
INSERT
、UPDATE
、DELETE
操作,并解析变更数据。
这种方案适用于 分布式数据同步、缓存一致性 和 数据变更通知,是实时数据处理的重要手段。
到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!
您可能感兴趣的文章:- MySQL数据变化监听的实现方案
- 使用Canal监听MySQL Binlog日志的实现方案
- 如何通过Java监听MySQL数据的变化
- Canal监听MySQL的实现步骤
用户点评