在看这篇文章前,要看一下这篇,mysql 同步数据到 elasticsearch,这篇文章只讲了,单服务器单库单表的情况。本文主要讲,多服务器,多库,多表的情况。
1,修改canal.properties
# vim /root/canal/deployer/conf/canal.properties canal.destinations = mysql113,mysql122 //默认是example,这里mysql113,mysql122都是目录
2,创建配置目录和日志目录
# /root/canal/deployer/ # cp -rp conf/example conf/mysql113 # cp -rp conf/example conf/mysql122 # mkdir logs/mysql113 logs/mysql122
3,配置多个mysql数据源
# egrep -v "(^#|^$)" conf/mysql122/instance.properties canal.instance.mysql.slaveId=122 //不要相同 canal.instance.gtidon=false canal.instance.master.address=10.0.10.122:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=mysql122 canal.mq.partition=0 # egrep -v "(^#|^$)" conf/mysql113/instance.properties canal.instance.mysql.slaveId=113 //不要相同 canal.instance.gtidon=false canal.instance.master.address=10.0.40.113:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=mysql113 canal.mq.partition=0
4,启动canal deployer
# ./bin/startup.sh # tail -f logs/canal/canal.log
5,配置adapter application.yml
# /root/canal/adapter/
# cat conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://10.0.40.113:3306/result_lianshan_saas?useUnicode=true
username: canal
password: Canal-123
defaultDS2:
url: jdbc:mysql://10.0.10.222:3306/result_lianshan?useUnicode=true
username: canal
password: Canal-123
canalAdapters:
- instance: mysql113
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es6
hosts: 10.0.10.245:19200
properties:
mode: rest
cluster.name: estestcluster
- instance: mysql122
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es6
hosts: 10.0.10.245:19200
properties:
mode: rest
cluster.name: estestcluster
es的配置可以不同。
6,创建es索引
# curl -XPUT "http://10.0.40.200:9200/result_lianshan_user/?pretty" -H "Content-Type: application/json" -d'
{
"mappings" : {
"user": {
"_all":{
"enabled":false
},
"properties": {
"id": {
"type": "integer"
},
"sex": {
"type": "integer"
},
"name" : {
"type" : "text",
"analyzer": "ik_smart",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
},
"settings" : {
"number_of_shards" : "3",
"number_of_replicas" : "1"
}
}'
# curl -XPUT "http://10.0.40.200:9200/result_lianshan_order/?pretty" -H "Content-Type: application/json" -d'
{
"mappings" : {
"myorder": {
"_all":{
"enabled":false
},
"properties": {
"id": {
"type": "integer"
},
"userid": {
"type": "integer"
},
"username" : {
"type" : "text",
"analyzer": "ik_smart",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
},
"settings" : {
"number_of_shards" : "3",
"number_of_replicas" : "1"
}
}'
建议es索引名是,数据库加表名,如果不嫌索引长,也可以带上ip的最后几位
7,创建同步表
CREATE TABLE `user` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `sex` int(1) NOT NULL DEFAULT '0' COMMENT 'sex', `name` varchar(20) COLLATE utf8_bin DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='user'; CREATE TABLE `myorder` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `userid` int(10) NOT NULL DEFAULT '0' COMMENT 'userid', `username` varchar(20) COLLATE utf8_bin DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='order';
8,创建adapter同步配置
[root@bigserver2 conf]# cat es6/mysql113_test.yml dataSourceKey: defaultDS destination: mysql113 groupId: g1 esMapping: _index: result_lianshan_saas _type: test _id: _id sql: "select test.id as _id ,test.id,test.ids,test.hao,test.fa from test" commitBatch: 3000 [root@bigserver2 conf]# cat es6/mysql122_order.yml dataSourceKey: defaultDS2 destination: mysql122 groupId: g1 esMapping: _index: result_lianshan_order _type: myorder _id: _id sql: "select myorder.id as _id,myorder.id,myorder.userid,myorder.username from myorder" commitBatch: 3000 [root@bigserver2 conf]# cat es6/mysql122_user.yml dataSourceKey: defaultDS2 destination: mysql122 groupId: g1 esMapping: _index: result_lianshan_user _type: user _id: _id sql: "select user.id as _id,user.id,user.sex,user.name from user" commitBatch: 3000
9,重启adapter
# ./bin/startup.sh # tail -f logs/adapter/adapter.log
如果启动没有报错,基本上就配置成功了,在mysql中测试一下吧。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2506.html
