mysql 同步数据到 elasticsearch

张映 发表于 2021-02-20

分类目录: elasticsearch

标签:, ,

数据量比较大的情况下,elasticsearch单表操作要比mysql快很多,全文检索也比mysql快很多。试用了一下阿里的canal,感觉还不错。

一,安装java

# yum install java-1.8.0-openjdk-devel.x86_64

二,下载canal

下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2

# mkdir adapter deployer 

# tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C adapter
# tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C deployer

不要直接解压,指定目录解压

三,创建测试mysql数据库和表

1,创建数据库和表

CREATE DATABASE `result_lianshan_saas` default CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

CREATE TABLE `test` (
 `id` int(4) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
 `ids` int(4) NOT NULL DEFAULT 0 COMMENT 'ID',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='tank test';

2,创建用户和分配权限

mysql> CREATE USER canal IDENTIFIED BY 'Canal-123';
Query OK, 0 rows affected (0.06 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.04 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.10 sec)

3,开启binlog

# vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin   # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式
server_id=1      # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

# systemctl restart mysqld

四,创建索引

# curl -XPUT "http://10.0.40.200:9200/result_lianshan_saas/?pretty" -H "Content-Type: application/json" -d'
{
    "mappings" : {
      "test": {
            "_all":{
              "enabled":false
            },
            "properties": {
                "id": {
                    "type": "integer"
                },
                "ids": {
                    "type": "integer"
                }
            }
        }
    },
    "settings" : {
        "number_of_shards" : "3",
        "number_of_replicas" : "1"
    }
}'

索引名最好和数据库名一样,方便自己

五,配置Canal-server

# egrep -v "(^#|^$)" deployer/conf/example/instance.properties
canal.instance.gtidon=false
canal.instance.master.address=10.0.40.200:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal-123
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=example
canal.mq.partition=0

# ./deployer/bin/startup.sh
# tail -f logs/canal/canal.log

日志很重要,如果有报错肯定是同步不了的

六,配置canal.adapter

1,修改application.yml

# egrep -v "(^#|^$)" adapter/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://10.0.40.200:3306/result_lianshan_saas?useUnicode=true
      username: canal
      password: Canal-123
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es6   //根目录要对上,不然报错的,es6还是7,要根es的版本对上
        hosts: 10.0.10.245:19200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: estestcluster

2,设置同步的表

# cat adapter/conf/es6/test.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: result_lianshan_saas
  _type: test
  _id: _id
  sql: "select test.id as _id ,test.id,test.ids from test"
  commitBatch: 3000

删除默认的demo,索引名和type类型要和es的对上

3,启动canal.adapter

# ./bin/startup.sh
# tail -f logs/adapter/adapter.log

日志很重要,如果有报错肯定是同步不了的

4,测试在mysql中插入

mysql> insert into result_lianshan_saas.test(ids)value(1);

在logs/adapter/adapter.log 日志会有以下内容

2021-02-20 10:15:47.032 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"ids":1}],"database":"result_lianshan_saas","destination":"example","es":1613787346000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1613787347032,"type":"INSERT"}

在es中就查看

# curl -XPOST "http://10.0.40.193:9200/result_lianshan_saas/test/_search?pretty"  -H "Content-Type: application/json"
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "result_lianshan_saas",
        "_type" : "test",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "ids" : 1
        }
      }
    ]
  }
}

七,mysql添加字段同步

1,修改adapter/conf/es6/test.yml

# vim adapter/conf/es6/test.yml
sql: "select test.id as _id ,test.id,test.ids,test.hao from test" //加字段

2,停止adapter/bin/stop.sh

3,删除deployer/conf/example/h2.mv.db

4,重启deployer/bin/restart.sh

5,启动adapter/bin/startup.sh

 



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2501.html