在看这篇文章前,要看一下这篇,mysql 同步数据到 elasticsearch,这篇文章只讲了,单服务器单库单表的情况。本文主要讲,多服务器,多库,多表的情况。
1,修改canal.properties
# vim /root/canal/deployer/conf/canal.properties canal.destinations = mysql113,mysql122 //默认是example,这里mysql113,mysql122都是目录
2,创建配置目录和日志目录
# /root/canal/deployer/ # cp -rp conf/example conf/mysql113 # cp -rp conf/example conf/mysql122 # mkdir logs/mysql113 logs/mysql122
3,配置多个mysql数据源
# egrep -v "(^#|^$)" conf/mysql122/instance.properties canal.instance.mysql.slaveId=122 //不要相同 canal.instance.gtidon=false canal.instance.master.address=10.0.10.122:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=mysql122 canal.mq.partition=0 # egrep -v "(^#|^$)" conf/mysql113/instance.properties canal.instance.mysql.slaveId=113 //不要相同 canal.instance.gtidon=false canal.instance.master.address=10.0.40.113:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=mysql113 canal.mq.partition=0
4,启动canal deployer
# ./bin/startup.sh # tail -f logs/canal/canal.log
5,配置adapter application.yml
# /root/canal/adapter/ # cat conf/application.yml server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://10.0.40.113:3306/result_lianshan_saas?useUnicode=true username: canal password: Canal-123 defaultDS2: url: jdbc:mysql://10.0.10.222:3306/result_lianshan?useUnicode=true username: canal password: Canal-123 canalAdapters: - instance: mysql113 groups: - groupId: g1 outerAdapters: - name: logger - name: es6 hosts: 10.0.10.245:19200 properties: mode: rest cluster.name: estestcluster - instance: mysql122 groups: - groupId: g1 outerAdapters: - name: logger - name: es6 hosts: 10.0.10.245:19200 properties: mode: rest cluster.name: estestcluster
es的配置可以不同。
6,创建es索引
# curl -XPUT "http://10.0.40.200:9200/result_lianshan_user/?pretty" -H "Content-Type: application/json" -d' { "mappings" : { "user": { "_all":{ "enabled":false }, "properties": { "id": { "type": "integer" }, "sex": { "type": "integer" }, "name" : { "type" : "text", "analyzer": "ik_smart", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } }, "settings" : { "number_of_shards" : "3", "number_of_replicas" : "1" } }' # curl -XPUT "http://10.0.40.200:9200/result_lianshan_order/?pretty" -H "Content-Type: application/json" -d' { "mappings" : { "myorder": { "_all":{ "enabled":false }, "properties": { "id": { "type": "integer" }, "userid": { "type": "integer" }, "username" : { "type" : "text", "analyzer": "ik_smart", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } }, "settings" : { "number_of_shards" : "3", "number_of_replicas" : "1" } }'
建议es索引名是,数据库加表名,如果不嫌索引长,也可以带上ip的最后几位
7,创建同步表
CREATE TABLE `user` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `sex` int(1) NOT NULL DEFAULT '0' COMMENT 'sex', `name` varchar(20) COLLATE utf8_bin DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='user'; CREATE TABLE `myorder` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `userid` int(10) NOT NULL DEFAULT '0' COMMENT 'userid', `username` varchar(20) COLLATE utf8_bin DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='order';
8,创建adapter同步配置
[root@bigserver2 conf]# cat es6/mysql113_test.yml dataSourceKey: defaultDS destination: mysql113 groupId: g1 esMapping: _index: result_lianshan_saas _type: test _id: _id sql: "select test.id as _id ,test.id,test.ids,test.hao,test.fa from test" commitBatch: 3000 [root@bigserver2 conf]# cat es6/mysql122_order.yml dataSourceKey: defaultDS2 destination: mysql122 groupId: g1 esMapping: _index: result_lianshan_order _type: myorder _id: _id sql: "select myorder.id as _id,myorder.id,myorder.userid,myorder.username from myorder" commitBatch: 3000 [root@bigserver2 conf]# cat es6/mysql122_user.yml dataSourceKey: defaultDS2 destination: mysql122 groupId: g1 esMapping: _index: result_lianshan_user _type: user _id: _id sql: "select user.id as _id,user.id,user.sex,user.name from user" commitBatch: 3000
9,重启adapter
# ./bin/startup.sh # tail -f logs/adapter/adapter.log
如果启动没有报错,基本上就配置成功了,在mysql中测试一下吧。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2506.html