canal 多个mysql服务器同步到elasticsearch

张映 发表于 2021-03-02

分类目录: elasticsearch

标签:, ,

在看这篇文章前,要看一下这篇,mysql 同步数据到 elasticsearch,这篇文章只讲了,单服务器单库单表的情况。本文主要讲,多服务器,多库,多表的情况。

1,修改canal.properties

# vim /root/canal/deployer/conf/canal.properties
canal.destinations = mysql113,mysql122 //默认是example,这里mysql113,mysql122都是目录

2,创建配置目录和日志目录

# /root/canal/deployer/
# cp -rp conf/example conf/mysql113
# cp -rp conf/example conf/mysql122
# mkdir logs/mysql113 logs/mysql122

3,配置多个mysql数据源

# egrep -v "(^#|^$)" conf/mysql122/instance.properties
canal.instance.mysql.slaveId=122 //不要相同
canal.instance.gtidon=false
canal.instance.master.address=10.0.10.122:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal-123
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=mysql122
canal.mq.partition=0

# egrep -v "(^#|^$)" conf/mysql113/instance.properties
canal.instance.mysql.slaveId=113 //不要相同
canal.instance.gtidon=false
canal.instance.master.address=10.0.40.113:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal-123
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=mysql113
canal.mq.partition=0

4,启动canal deployer

# ./bin/startup.sh
# tail -f logs/canal/canal.log

5,配置adapter application.yml

# /root/canal/adapter/

# cat conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://10.0.40.113:3306/result_lianshan_saas?useUnicode=true
      username: canal
      password: Canal-123
    defaultDS2:
      url: jdbc:mysql://10.0.10.222:3306/result_lianshan?useUnicode=true
      username: canal
      password: Canal-123
  canalAdapters:
  - instance: mysql113
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es6
        hosts: 10.0.10.245:19200
        properties:
          mode: rest
          cluster.name: estestcluster
  - instance: mysql122
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es6
        hosts: 10.0.10.245:19200
        properties:
          mode: rest
          cluster.name: estestcluster

es的配置可以不同。

6,创建es索引

# curl -XPUT "http://10.0.40.200:9200/result_lianshan_user/?pretty" -H "Content-Type: application/json" -d'
{
    "mappings" : {
      "user": {
            "_all":{
              "enabled":false
            },
            "properties": {
                "id": {
                    "type": "integer"
                },
                "sex": {
                    "type": "integer"
                },
                "name" : {
                    "type" : "text",
                    "analyzer": "ik_smart",
                    "fields" : {
                      "keyword" : {
                        "type" : "keyword",
                        "ignore_above" : 256
                      }
                    }
                }
            }
        }
    },
    "settings" : {
        "number_of_shards" : "3",
        "number_of_replicas" : "1"
    }
}'  

# curl -XPUT "http://10.0.40.200:9200/result_lianshan_order/?pretty" -H "Content-Type: application/json" -d'
{
    "mappings" : {
      "myorder": {
            "_all":{
              "enabled":false
            },
            "properties": {
                "id": {
                    "type": "integer"
                },
                "userid": {
                    "type": "integer"
                },
                "username" : {
                    "type" : "text",
                    "analyzer": "ik_smart",
                    "fields" : {
                      "keyword" : {
                        "type" : "keyword",
                        "ignore_above" : 256
                      }
                    }
                }
            }
        }
    },
    "settings" : {
        "number_of_shards" : "3",
        "number_of_replicas" : "1"
    }
}'

建议es索引名是,数据库加表名,如果不嫌索引长,也可以带上ip的最后几位

7,创建同步表

CREATE TABLE `user` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `sex` int(1) NOT NULL DEFAULT '0' COMMENT 'sex',
  `name` varchar(20) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='user';

CREATE TABLE `myorder` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `userid` int(10) NOT NULL DEFAULT '0' COMMENT 'userid',
  `username` varchar(20) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='order';

8,创建adapter同步配置

[root@bigserver2 conf]# cat es6/mysql113_test.yml
dataSourceKey: defaultDS
destination: mysql113
groupId: g1
esMapping:
  _index: result_lianshan_saas
  _type: test
  _id: _id
  sql: "select test.id as _id ,test.id,test.ids,test.hao,test.fa from test"
  commitBatch: 3000

[root@bigserver2 conf]# cat es6/mysql122_order.yml
dataSourceKey: defaultDS2
destination: mysql122
groupId: g1
esMapping:
  _index: result_lianshan_order
  _type: myorder
  _id: _id
  sql: "select myorder.id as _id,myorder.id,myorder.userid,myorder.username from myorder"
  commitBatch: 3000

[root@bigserver2 conf]# cat es6/mysql122_user.yml
dataSourceKey: defaultDS2
destination: mysql122
groupId: g1
esMapping:
  _index: result_lianshan_user
  _type: user
  _id: _id
  sql: "select user.id as _id,user.id,user.sex,user.name from user"
  commitBatch: 3000

9,重启adapter

# ./bin/startup.sh
# tail -f logs/adapter/adapter.log
canal 多数据库 多表

canal 多数据库 多表

如果启动没有报错,基本上就配置成功了,在mysql中测试一下吧。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2506.html