当前位置:网站首页>canal实现从mysql实时同步数据到es
canal实现从mysql实时同步数据到es
2022-07-15 16:23:00 【浅唱~幸福】
1.环境准备:
1.1 mysql:5.7
1.2 elasticsearch:7.4.2
1.3 kibana:7.4.2
1.4 服务端:canal-deployer1.1.5
1.5 客户端:canal-adapter 1.1.5
2.下载安装MySQL(注:本人项目都是docker安装 这里就不一一展示安装步骤了 详情见百度或者看我其他文章)修改mysql配置文件 开启binlog日志,并且以ROW方式,开启主从模式 以及logbin的文件位置 如下:
server_id=101
binlog-ignore-db=mysql
log-bin=mall-mysql-bin
binlog_cache_size=1M
binlog_format=row
expire_logs_days=7
slave_skip_errors=1062

2.1 记得重启下mysql 查看MySQL配置是否开启
SHOW VARIABLES LIKE 'binlog-format'; -- 结果应该是ROW
SHOW VARIABLES LIKE 'log_bin'; -- 结果应该是 ON
SHOW VARIABLES LIKE '%log%'; -- 所有binlog信息
2.2 然后给canal创建一个canal账户 命令如下:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2.3 创建一个canal-test库和sys_log测试表
DROP TABLE IF EXISTS `sys_log`;
CREATE TABLE `sys_log` (
`id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '编号',
`type` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '1' COMMENT '日志类型',
`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '日志标题',
`create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建者',
`create_date` datetime NULL DEFAULT NULL COMMENT '创建时间',
`remote_addr` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作IP地址',
`user_agent` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户代理',
`request_uri` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '请求URI',
`method` varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作方式',
`params` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '操作提交的数据',
`exception` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常信息',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '日志表' ROW_FORMAT = Dynamic;
3.下载安装ES和Kibana 这里需要注意的是两个版本需要一致 这里就不介绍了 如下我自己安装的:

4.下载canal 的服务器端 canal-deployer1.1.5 和客户端 canal-adapter 1.1.5 下载地址为:https://github.com/alibaba/canal/releases

5. 解压这两个项目 命令:
tar -zxvf canal.adapter-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz

6.修改服务端 canal.deployer-1.1.5下conf目录名example为orgdeer-cui 命令:mv example orgdeer-cui 然后修改配置conf/orgdeer-cui/instance.properties 主要是修改数据库相关配置 改下面三个地方 如下:

6.启动服务器端canal.deployer-1.1.5
切换目录:cd /home/canal/canal.deployer-1.1.5/bin
启动:./startup.sh

7.查看日志是否启动成功:
tail -f /home/canal/canal.deployer-1.1.5/logs/canal/canal.log

8. 修改客户端 canal.adapter-1.1.5下的配置 application.yml 主要是修改canal-server配置、数据源配置和客户端适配器配置 主要改一下带注释的部分 如下:
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.es: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
# IMPORTANT!!! KEEP ADAPER SYNC CONSISTANT
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #服务器端地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/canal-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false&useInformationSchema=false&allowPublicKeyRetrieval=true #mysql链接地址
username: root #用户名
password: hzp123456 #密码
canalAdapters:
- instance: orgdeer-cui # 对应的是服务器端中配置mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger #日志打印适配器
- name: es7 #ES同步适配器
hosts: 127.0.0.1:9300 #ESl链接地址
properties:
mode: transport #模式可选transport(9300)或者rest(9200)
cluster.name: elasticsearch #ES集群名称
HOSTNAME%%.*:
PWD/#$HOME/~:
9.启动客户端 canal.adapter-1.1.5
切换目录:cd /home/canal/canal.adapter-1.1.5/bin
启动:./startup.sh

10 查看客户端日志是否启动成功
tail -f /home/canal/canal.adapter-1.1.5/logs/adapter/adapter.log

11.添加配置文件canal-adapter/conf/es7/sys_log.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系 (注意destination名称orgdeer-cui要跟服务器端mq topic name对应) 然后将文件sys_log.yml放到/home/canal/canal.adapter-1.1.5/conf/es7目录下

dataSourceKey: defaultDS
destination: orgdeer-cui
groupId: g1
esMapping:
_index: sys_log
_id: _id
sql: "select
id as _id, type,
title,
create_by as createBy,
UNIX_TIMESTAMP(create_date) as createDate,
remote_addr as remoteAddr,
user_agent as userAgent,
request_uri as requestUri,method,
params,exception
from sys_log"
etlCondition: "where create_date>={}"
commitBatch: 300012.在Kibana控制台中创建sys_log索引
PUT sys_log
{
"mappings": {
"properties":{
"type":{
"type":"keyword"
},
"title":{
"type":"text"
},
"createBy":{
"type":"keyword"
},
"remoteAddr":{
"type": "text"
},
"userAgent":{
"type":"text"
},
"requestUri": {
"type": "text"
},
"method": {
"type": "keyword"
},
"params": {
"type": "text"
},
"exception": {
"type": "text"
},
"createDate": {
"type": "long"
}
}
}
}

13.通过命令触发,让canal-adapter读取到的dml日志,同步到es的库中 命令如下:
curl -X POST http://127.0.0.1:8081/etl/es7/sys_log.yml
![]()
查看ES中数据同步的情况:

手动删除 添加数据 ES数据也会跟着同步的

边栏推荐
- Huawei's general card identification function enables multiple card bindings with one key
- Iterators and generators
- 如何将notepad++设置为默认打开方式
- flink.14 DataStream模块 source底层是怎么实现的?
- 997. Find the judge of the town
- Localdatetime format date and @jsonformat annotation usage and @tablefield (fill = fieldfill.insert)
- 华为通用卡证识别功能,一键实现多种卡绑定
- Functions and symbols
- fink.15 DataSet模块 算子大全
- How to set notepad++ as the default opening method
猜你喜欢

2022年高处安装、维护、拆除考题模拟考试平台操作
![[multithreading] CAS mechanism analysis and application (atomic class, spin lock), solving ABA problems](/img/5d/b544d25a9efeee9d1611399e440f06.png)
[multithreading] CAS mechanism analysis and application (atomic class, spin lock), solving ABA problems

Quantum computing + semiconductor materials! Quantum and JSR reach cooperation

QT UI Designer interface common operation records (qtablewidget)

QT ui设计师界面常用操作记录(QTableWidget)

Functions and symbols

Practice of online problem feedback module (III): automatically generate all controller, service, mapper and other files

企业在创建产品帮助中心时需要注意的问题!

软件研发效能需求价值流分析专题

2、Deep Learning in Higher Dimensions
随机推荐
仅需三步 轻松实现远程办公
Diwen serial port screen tutorial (1)
差距大?不同学历考生考研的要求和条件
杰理之播放最大音量提示音播不出来【篇】
De la numérisation à l'exploitation et à la maintenance intelligentes: Quelles sont les valeurs et les défis?
R language ggplot2 visualization: use the ggstripchart function of ggpubr package to visualize the dot strip plot, set the position parameter to configure the separation of different grouped data poin
Diwen serial port screen tutorial (3)
@Use of equalsandhashcode annotation
R语言ggplot2可视化条形图:通过双色渐变配色颜色主题可视化条形图
Engineering monitoring vibrating wire wireless acquisition instrument external digital sensor access logic and data transmission
迭代器与生成器
997. 找到小镇的法官
如何在企业工作中应用知识管理,解决企业的问题?
Is it safe to open futures accounts online?
华为通用卡证识别功能,一键实现多种卡绑定
External interrupt of stm32f4
函数高级应用
线性代数 笔记1
Extrait d'un bon article
2022年智能运维企业50强,博睿数据实力入选