如何将MySQL数据同步到Elasticsearch -- 知识铺
上一篇我们已经完成Elasticsearch 和logstash安装,现在可以进行把数据从mysql 数据库同步es索引上
1、下载java 数据库连接池
[root@localhost home]# cd /<span>home
[root@localhost home]# </span><span>wget</span> https:<span>//</span><span>repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.13/mysql-connector-java-8.0.13.jar</span>
2、安装logstash-input-jdb (如果你是按上一篇说的安装的是7.1以上的logstash 版本,调过这一步,因为它已经集成了)
直接在logstash的安装目录bin下运行
View Code
3、增加logstash配置
[root@localhost home]# <span>vi</span> /etc/logstash/conf.d/users.conf
把以下代码复制进去并保存
<span>input {
jdbc {
jdbc_driver_library </span>=> <span>"</span><span>/home/mysql-connector-java-8.0.13.jar</span><span>"</span> <span>#</span><span> 刚刚下载的 mysql-connector-java的绝对路径</span>
jdbc_driver_class => <span>"</span><span>com.mysql.jdbc.Driver</span><span>"</span><span>
jdbc_connection_string </span>=> <span>"</span><span>jdbc:mysql://10.8.42.10:3306/school?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai</span><span>"</span> <span>#</span><span>数据库IP:10.8.42.10 ,端口:3306,库名:school</span>
jdbc_user => <span>"</span><span>root</span><span>"</span> <span>#</span><span>数据库用户名</span>
jdbc_password => <span>"</span><span>dfc52f</span><span>"</span> <span>#</span><span>数据库密码</span>
schedule => <span>"</span><span>* * * * *</span><span>"</span> <span>#</span><span>cron表达式,设置多久跑一次,5个*代表每分钟跑一次</span>
jdbc_paging_enabled => <span>"</span><span>true</span><span>"</span> <span>#</span><span>如果数据较多,可以设置分页查询</span>
jdbc_page_size => <span>"</span><span>200</span><span>"</span> <span>#</span><span>每页查询的行数</span>
record_last_run =><span> true
clean_run </span>=><span> false
statement </span>=> <span>"</span><span>SELECT * FROM users WHERE updated_at >= :sql_last_value</span><span>"</span> <span>#</span><span>查询语句,会把该语句查询出的数据推给output设置的服务</span>
use_column_value => true <span>#</span><span>When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.</span>
tracking_column_type => <span>"</span><span>timestamp</span><span>"</span> <span>#</span><span>用来跟踪数据变化的列类型:numeric 或者 timestamp</span>
tracking_column => <span>"</span><span>updated_at</span><span>"</span> <span>#</span><span>通过哪个列来跟踪是否有更新</span>
last_run_metadata_path => <span>"</span><span>/data/meta/users2_offset.txt</span><span>"</span> <span>#</span><span>指定上次运行的偏移量值 目录需提前建,users2_offset.txt 文件如果不存在,系统会自动建</span>
<span> }
}
output {
elasticsearch {
</span><span>#</span><span> ES的IP地址及端口</span>
hosts => [<span>"</span><span>127.0.0.1:9200</span><span>"</span><span>]
</span><span>#</span><span> 索引名称 可自定义</span>
index => <span>"users</span><span>"</span>
<span>#</span><span> 需要关联的数据库中有有一个id字段,对应类型中的id</span>
document_id => <span>"</span><span>%{id}</span><span>"</span><span>
}
stdout {
</span><span>#</span><span> JSON格式输出</span>
codec =><span> json_lines
}
}</span>
运行logstah来把mysql数据推给es
/usr/share/logstash/bin/logstash -f --path.settings=/etc/logstash/conf.d/users.conf -t
正常的话看到没有报错,并有sql语句数据
查看es上的索引
[root@localhost conf.d]# curl ’localhost:9200/_cat/indices?v’
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .geoip_databases 4NHJwd3OSsmTu4rXj7Wmtw 1 0 42 45 48.2mb 48.2mb
yellow open users-fr-java8 IZfD2tbcRcirSga5KTjQSg 1 1 4 32 18.3kb 18.3kb
yellow open order_shopify iTj8aUqhRl2ulUXLM5-1fQ 1 1 9639 0 40.8mb 40.8mb
yellow open users _hYfruXyQUulZ5-TefybHg 1 1 125 49 653.9kb 653.9kb
yellow open user-6 7MxDLCMUQfe2pzrCCoDJHw 1 1 4 3339 172.1kb 172.1kb
-———–END—————————
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/geek001/post/20240507/%E5%A6%82%E4%BD%95%E5%B0%86MySQL%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A5%E5%88%B0Elasticsearch--%E7%9F%A5%E8%AF%86%E9%93%BA/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com