上一篇我们已经完成Elasticsearch 和logstash安装,现在可以进行把数据从mysql 数据库同步es索引上

1、下载java 数据库连接池  

[root@localhost home]# cd /<span>home
[root@localhost home]# </span><span>wget</span> https:<span>//</span><span>repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.13/mysql-connector-java-8.0.13.jar</span>

2、安装logstash-input-jdb  (如果你是按上一篇说的安装的是7.1以上的logstash 版本,调过这一步,因为它已经集成了)

直接在logstash的安装目录bin下运行

View Code

3、增加logstash配置

[root@localhost home]# <span>vi</span> /etc/logstash/conf.d/users.conf

把以下代码复制进去并保存

复制代码

<span>input {
  jdbc {
    jdbc_driver_library </span>=&gt; <span>"</span><span>/home/mysql-connector-java-8.0.13.jar</span><span>"</span> <span>#</span><span> 刚刚下载的 mysql-connector-java的绝对路径</span>
    jdbc_driver_class =&gt; <span>"</span><span>com.mysql.jdbc.Driver</span><span>"</span><span>
    jdbc_connection_string </span>=&gt; <span>"</span><span>jdbc:mysql://10.8.42.10:3306/school?useSSL=false&amp;characterEncoding=utf8&amp;serverTimezone=Asia/Shanghai</span><span>"</span> <span>#</span><span>数据库IP:10.8.42.10 ,端口:3306,库名:school</span>
    jdbc_user =&gt; <span>"</span><span>root</span><span>"</span> <span>#</span><span>数据库用户名</span>
    jdbc_password =&gt; <span>"</span><span>dfc52f</span><span>"</span> <span>#</span><span>数据库密码</span>
    schedule =&gt; <span>"</span><span>* * * * *</span><span>"</span> <span>#</span><span>cron表达式,设置多久跑一次,5个*代表每分钟跑一次</span>
    jdbc_paging_enabled =&gt; <span>"</span><span>true</span><span>"</span> <span>#</span><span>如果数据较多,可以设置分页查询</span>
    jdbc_page_size =&gt; <span>"</span><span>200</span><span>"</span> <span>#</span><span>每页查询的行数</span>
    record_last_run =&gt;<span> true
    clean_run </span>=&gt;<span> false
    statement </span>=&gt; <span>"</span><span>SELECT * FROM users WHERE updated_at &gt;= :sql_last_value</span><span>"</span> <span>#</span><span>查询语句,会把该语句查询出的数据推给output设置的服务</span>
    use_column_value =&gt; true <span>#</span><span>When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.</span>
    tracking_column_type =&gt; <span>"</span><span>timestamp</span><span>"</span> <span>#</span><span>用来跟踪数据变化的列类型:numeric 或者 timestamp</span>
    tracking_column =&gt; <span>"</span><span>updated_at</span><span>"</span> <span>#</span><span>通过哪个列来跟踪是否有更新</span>
    last_run_metadata_path =&gt; <span>"</span><span>/data/meta/users2_offset.txt</span><span>"</span> <span>#</span><span>指定上次运行的偏移量值 目录需提前建,users2_offset.txt 文件如果不存在,系统会自动建</span>
<span>  }
}
output {
    elasticsearch {
        </span><span>#</span><span> ES的IP地址及端口</span>
        hosts =&gt; [<span>"</span><span>127.0.0.1:9200</span><span>"</span><span>]
        </span><span>#</span><span> 索引名称 可自定义</span>
        index =&gt; <span>"users</span><span>"</span>
        <span>#</span><span> 需要关联的数据库中有有一个id字段,对应类型中的id</span>
        document_id =&gt; <span>"</span><span>%{id}</span><span>"</span><span>
    }
    stdout {
        </span><span>#</span><span> JSON格式输出</span>
        codec =&gt;<span> json_lines
    }
}</span>

复制代码

运行logstah来把mysql数据推给es

/usr/share/logstash/bin/logstash -f --path.settings=/etc/logstash/conf.d/users.conf -t

正常的话看到没有报错,并有sql语句数据

 

 查看es上的索引

复制代码

[root@localhost conf.d]# curl ’localhost:9200/_cat/indices?v’
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .geoip_databases 4NHJwd3OSsmTu4rXj7Wmtw 1 0 42 45 48.2mb 48.2mb
yellow open users-fr-java8 IZfD2tbcRcirSga5KTjQSg 1 1 4 32 18.3kb 18.3kb
yellow open order_shopify iTj8aUqhRl2ulUXLM5-1fQ 1 1 9639 0 40.8mb 40.8mb
yellow open users _hYfruXyQUulZ5-TefybHg 1 1 125 49 653.9kb 653.9kb
yellow open user-6 7MxDLCMUQfe2pzrCCoDJHw 1 1 4 3339 172.1kb 172.1kb

复制代码

-———–END—————————