您的位置 首页 java

canal解析mysql的binlog实时推送到kafka

今天整理一下以前写的一个kafka消费canal的demo,实现实时推送数据到kafka.首先先介绍一下canal,官网是这么说的:

 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x  
 1.mysql前期准备(开启binlog):
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin 
# 开启 binlog binlog-format=ROW 
# 选择 ROW 模式 server_id=1 
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;  
 2.下载canal,下载1.4版本的即可,1.4版本增加了admin webUI这个比较不错。
  

 3.window下启动:
  # 通过执行bin/startup.bat 启动
启动时遇到如下问题:
最后查看issue找到解决方式:
  

 最后查看issue找到解决方式:

之后再次启动成功:  

canal解析mysql的binlog实时推送到kafka

 4.启动成功之后启动canal  

canal解析mysql的binlog实时推送到kafka

canal解析mysql的binlog实时推送到kafka

  • 启动成功之后修改我们配置的数据库中的任意一个表分别触发增加、修改、删除操作,然后就会看到对应的输出:
 插入操作触发:  
canal解析mysql的binlog实时推送到kafka

insert

 修改操作触发:  
canal解析mysql的binlog实时推送到kafka

update

 删除操作触发:  

delete

  • 在整合 canal-example项目 中的kafka例子时发现如下问题:
 example中缺少:
<dependency> 
  <groupId>org.apache.kafka</groupId> 
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version> 
</dependency>
需要单独添加,否则会报错。  
  • 配置好canal相关的kafka配置之后,打开设置的topic,然后修改任意的数据库表,打开kafka消费者看到对数据库的操作如下:

canal解析mysql的binlog实时推送到kafka

文章来源:智云一二三科技

文章标题:canal解析mysql的binlog实时推送到kafka

文章地址:https://www.zhihuclub.com/190914.shtml

关于作者: 智云科技

热门文章

网站地图