当前位置:网站首页>[玩转ES] ES批量/全量导入数据
[玩转ES] ES批量/全量导入数据
2022-07-16 13:43:00 【0xYGC】
方法 / 步骤
一:Logstash实现
1.1 安装插件
# 从Logstash的bin目录下安装输入输出ES和MySQL插件
./logstash-plugin install logstash-output-elasticsearch
./logstash-plugin install logstash-input-jdbc

将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下
1.2 配置
1.2.1 pipelines配置
- logstash主目录下\config\pipelines.yml 放开下面的注释
- pipeline.id: test
pipeline.workers: 1
pipeline.batch.size: 1
config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
queue.type: persisted
path.config: "/tmp/logstash/*.config"
1.2.2 JDBC 同步配置
1.2.2.1 全量更新
文件名称: init-mysql2es.conf
input {
stdin {
}
jdbc{
type => "classify"
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,school-edu为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.11.10:3306/ldd_saas?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "useradmin"
# 驱动
jdbc_driver_library => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql-connector-java-8.0.19.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 是否开启分页
jdbc_paging_enabled => true
#指定每页显示50000条
jdbc_page_size => "50000"
#是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => false
#执行的sql 文件路径+名称
#statement_filepath => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql/jdbc.sql"
#执行的sql语句
statement => "SELECT `member_id` AS id, `club_id`, `name`, `sex`, `mobile`, `avatar`, `birthday`, `face_img`, `credit_type`, `credit_no`, `wechat_no`, `nick_name`, `stature`, `weight` FROM member_basics"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
#schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "member_basics"
#user => "elastic"
#password => "changeme"
#document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
1.2.2.2 增量更新
文件名称: sync-mysql2es.conf
#logstash输入配置
input {
#jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
# 是否开启分页
jdbc_paging_enabled => true
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => true
# 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
tracking_column => "unix_ts_in_secs"
# tracking_column 对应字段的类型
tracking_column_type => "numeric"
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
schedule => "*/5 * * * * *"
# 同步数据的查询sql语句
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
#logstash输入数据的字段匹配和数据过滤
filter {
mutate {
copy => {
"id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
#logstash输出配置
output {
# 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
stdout {
codec => "rubydebug"}
# 指定输出到ES的具体索引
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
1.2.3 运行脚本
# 全量构建 run-init2es.bat
logstash -f init-mysql2es.conf
#增量构建 文件名称: run-sync2es.bat
logstash -f sync-mysql2es.conf
- 导入成功

- kibana 查看已经导入成功

参考资料 & 致谢
[1] mysql (全量)数据导入到 elasticsearch
[2] 通过Logstash实现mysql数据定时增量同步到ES
边栏推荐
- [dynamic programming]dp20 calculation string editing distance - medium
- 【Leetcode】225. 用队列实现栈
- 阿里妈妈展示广告引擎新探索:迈向全局最优算力分配
- NoSQLAttack工具安装与使用问题解决
- [dynamic programming]dp19 longest common subsequence (I) - medium
- Nature Microbiology | 枯草芽孢杆菌生物膜促进甜瓜生长并抗病
- 质量与效率并重,测试左移助力块存储技术研发
- (Note)七彩虹30系列显卡——《一键超频》按键
- 云原生:Docker 实践经验(四)docker上部署 redis 三主三从集群
- Today's sleep quality record 85 points
猜你喜欢

Xray安装使用

随心玩玩(八)jenkins学习(待更新)

Unity游戏文件大,如何缩小游戏文件

发论文!AI,机器学习,CV大佬科研项目招生!

module ‘sklearn.datasets‘ has no attribute ‘fetch_mldata‘

(pytorch进阶之路二)transformer学习与难点代码实现

暑期学习Matlab笔记

Practice of online problem feedback module (4): encapsulating general field classes

北京华联BHG Mall持续发力,BHG DAY引领城市消费新热潮

HALCON联合C#检测表面缺陷——ROI交互(二)(和图片同步缩放裁剪等功能)
随机推荐
[PaddleSeg源码阅读] 关于PaddleSeg模型返回的都是list这件小事
使用plt.savefig()方法保存绘图时出现图片全白或全黑的问题
Today's sleep quality record 85 points
北京华联BHG Mall持续发力,BHG DAY引领城市消费新热潮
[CVPR2019] On Stabilizing Generative Adversarial Training with Noise
JD finance, are you bad, or are you cutting too much??
Web crawler realizes sending SMS verification code
20220715 国内Conda不fq安装Pytorch最新版本的方法
26 top open source projects, 87 open tasks, Alibaba programming summer 2022 student registration channel opened
网络爬虫爬取三国演义所有章节的标题和内容(BeautifulSoup解析)
【产品人卫朋】2022年产品人必备的13个设计类网站(1.0版)
What is packet loss and why
墨者学院刷题笔记——SQL手工注入漏洞测试(MongoDB数据库)
Xray installation and use
协作进程【简单总结】
Boost create folder
移掉K位数字[贪心思想 & 单调栈实现]
C#网络应用编程,实验七: 异步编程练习
NoSQLAttack工具下载与使用
Digital pyramid (PTA)