canal 多个mysql服务器同步到elasticsearch

张映 发表于 2021-03-02

分类目录: elasticsearch

标签:, ,

在看这篇文章前,要看一下这篇,mysql 同步数据到 elasticsearch,这篇文章只讲了,单服务器单库单表的情况。本文主要讲,多服务器,多库,多表的情况。

1,修改canal.properties

  1. # vim /root/canal/deployer/conf/canal.properties  
  2. canal.destinations = mysql113,mysql122 //默认是example,这里mysql113,mysql122都是目录  

2,创建配置目录和日志目录

  1. # /root/canal/deployer/  
  2. # cp -rp conf/example conf/mysql113  
  3. # cp -rp conf/example conf/mysql122  
  4. mkdir logs/mysql113 logs/mysql122  

3,配置多个mysql数据源

  1. # egrep -v "(^#|^$)" conf/mysql122/instance.properties  
  2. canal.instance.mysql.slaveId=122 //不要相同  
  3. canal.instance.gtidon=false  
  4. canal.instance.master.address=10.0.10.122:3306  
  5. canal.instance.master.journal.name=  
  6. canal.instance.master.position=  
  7. canal.instance.master.timestamp=  
  8. canal.instance.master.gtid=  
  9. canal.instance.rds.accesskey=  
  10. canal.instance.rds.secretkey=  
  11. canal.instance.rds.instanceId=  
  12. canal.instance.tsdb.enable=true  
  13. canal.instance.dbUsername=canal  
  14. canal.instance.dbPassword=Canal-123  
  15. canal.instance.connectionCharset = UTF-8  
  16. canal.instance.enableDruid=false  
  17. canal.instance.filter.regex=.*\\..*  
  18. canal.instance.filter.black.regex=mysql\\.slave_.*  
  19. canal.mq.topic=mysql122  
  20. canal.mq.partition=0  
  21.   
  22. # egrep -v "(^#|^$)" conf/mysql113/instance.properties  
  23. canal.instance.mysql.slaveId=113 //不要相同  
  24. canal.instance.gtidon=false  
  25. canal.instance.master.address=10.0.40.113:3306  
  26. canal.instance.master.journal.name=  
  27. canal.instance.master.position=  
  28. canal.instance.master.timestamp=  
  29. canal.instance.master.gtid=  
  30. canal.instance.rds.accesskey=  
  31. canal.instance.rds.secretkey=  
  32. canal.instance.rds.instanceId=  
  33. canal.instance.tsdb.enable=true  
  34. canal.instance.dbUsername=canal  
  35. canal.instance.dbPassword=Canal-123  
  36. canal.instance.connectionCharset = UTF-8  
  37. canal.instance.enableDruid=false  
  38. canal.instance.filter.regex=.*\\..*  
  39. canal.instance.filter.black.regex=mysql\\.slave_.*  
  40. canal.mq.topic=mysql113  
  41. canal.mq.partition=0  

4,启动canal deployer

  1. # ./bin/startup.sh  
  2. # tail -f logs/canal/canal.log  

5,配置adapter application.yml

  1. # /root/canal/adapter/  
  2.   
  3. # cat conf/application.yml  
  4. server:  
  5.   port: 8081  
  6. spring:  
  7.   jackson:  
  8.     date-format: yyyy-MM-dd HH:mm:ss  
  9.     time-zone: GMT+8  
  10.     default-property-inclusion: non_null  
  11. canal.conf:  
  12.   mode: tcp #tcp kafka rocketMQ rabbitMQ  
  13.   flatMessage: true  
  14.   zookeeperHosts:  
  15.   syncBatchSize: 1000  
  16.   retries: 0  
  17.   timeout:  
  18.   accessKey:  
  19.   secretKey:  
  20.   consumerProperties:  
  21.     canal.tcp.server.host: 127.0.0.1:11111  
  22.     canal.tcp.zookeeper.hosts:  
  23.     canal.tcp.batch.size: 500  
  24.     canal.tcp.username:  
  25.     canal.tcp.password:  
  26.   srcDataSources:  
  27.     defaultDS:  
  28.       url: jdbc:mysql://10.0.40.113:3306/result_lianshan_saas?useUnicode=true  
  29.       username: canal  
  30.       password: Canal-123  
  31.     defaultDS2:  
  32.       url: jdbc:mysql://10.0.10.222:3306/result_lianshan?useUnicode=true  
  33.       username: canal  
  34.       password: Canal-123  
  35.   canalAdapters:  
  36.   - instance: mysql113  
  37.     groups:  
  38.     - groupId: g1  
  39.       outerAdapters:  
  40.       - name: logger  
  41.       - name: es6  
  42.         hosts: 10.0.10.245:19200  
  43.         properties:  
  44.           mode: rest  
  45.           cluster.name: estestcluster  
  46.   - instance: mysql122  
  47.     groups:  
  48.     - groupId: g1  
  49.       outerAdapters:  
  50.       - name: logger  
  51.       - name: es6  
  52.         hosts: 10.0.10.245:19200  
  53.         properties:  
  54.           mode: rest  
  55.           cluster.name: estestcluster  

es的配置可以不同。

6,创建es索引

  1. # curl -XPUT "http://10.0.40.200:9200/result_lianshan_user/?pretty" -H "Content-Type: application/json" -d' 
  2. { 
  3.     "mappings" : { 
  4.       "user": { 
  5.             "_all":{ 
  6.               "enabled":false 
  7.             }, 
  8.             "properties": { 
  9.                 "id": { 
  10.                     "type": "integer" 
  11.                 }, 
  12.                 "sex": { 
  13.                     "type": "integer" 
  14.                 }, 
  15.                 "name" : { 
  16.                     "type" : "text", 
  17.                     "analyzer": "ik_smart", 
  18.                     "fields" : { 
  19.                       "keyword" : { 
  20.                         "type" : "keyword", 
  21.                         "ignore_above" : 256 
  22.                       } 
  23.                     } 
  24.                 } 
  25.             } 
  26.         } 
  27.     }, 
  28.     "settings" : { 
  29.         "number_of_shards" : "3", 
  30.         "number_of_replicas" : "1" 
  31.     } 
  32. }'    
  33.   
  34. # curl -XPUT "http://10.0.40.200:9200/result_lianshan_order/?pretty" -H "Content-Type: application/json" -d' 
  35. { 
  36.     "mappings" : { 
  37.       "myorder": { 
  38.             "_all":{ 
  39.               "enabled":false 
  40.             }, 
  41.             "properties": { 
  42.                 "id": { 
  43.                     "type": "integer" 
  44.                 }, 
  45.                 "userid": { 
  46.                     "type": "integer" 
  47.                 }, 
  48.                 "username" : { 
  49.                     "type" : "text", 
  50.                     "analyzer": "ik_smart", 
  51.                     "fields" : { 
  52.                       "keyword" : { 
  53.                         "type" : "keyword", 
  54.                         "ignore_above" : 256 
  55.                       } 
  56.                     } 
  57.                 } 
  58.             } 
  59.         } 
  60.     }, 
  61.     "settings" : { 
  62.         "number_of_shards" : "3", 
  63.         "number_of_replicas" : "1" 
  64.     } 
  65. }'  

建议es索引名是,数据库加表名,如果不嫌索引长,也可以带上ip的最后几位

7,创建同步表

  1. CREATE TABLE `user` (  
  2.   `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',  
  3.   `sex` int(1) NOT NULL DEFAULT '0' COMMENT 'sex',  
  4.   `name` varchar(20) COLLATE utf8_bin DEFAULT NULL,  
  5.   PRIMARY KEY (`id`)  
  6. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='user';  
  7.   
  8. CREATE TABLE `myorder` (  
  9.   `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',  
  10.   `userid` int(10) NOT NULL DEFAULT '0' COMMENT 'userid',  
  11.   `username` varchar(20) COLLATE utf8_bin DEFAULT NULL,  
  12.   PRIMARY KEY (`id`)  
  13. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='order';  

8,创建adapter同步配置

  1. [root@bigserver2 conf]# cat es6/mysql113_test.yml  
  2. dataSourceKey: defaultDS  
  3. destination: mysql113  
  4. groupId: g1  
  5. esMapping:  
  6.   _index: result_lianshan_saas  
  7.   _type: test  
  8.   _id: _id  
  9.   sql: "select test.id as _id ,test.id,test.ids,test.hao,test.fa from test"  
  10.   commitBatch: 3000  
  11.   
  12. [root@bigserver2 conf]# cat es6/mysql122_order.yml  
  13. dataSourceKey: defaultDS2  
  14. destination: mysql122  
  15. groupId: g1  
  16. esMapping:  
  17.   _index: result_lianshan_order  
  18.   _type: myorder  
  19.   _id: _id  
  20.   sql: "select myorder.id as _id,myorder.id,myorder.userid,myorder.username from myorder"  
  21.   commitBatch: 3000  
  22.   
  23. [root@bigserver2 conf]# cat es6/mysql122_user.yml  
  24. dataSourceKey: defaultDS2  
  25. destination: mysql122  
  26. groupId: g1  
  27. esMapping:  
  28.   _index: result_lianshan_user  
  29.   _type: user  
  30.   _id: _id  
  31.   sql: "select user.id as _id,user.id,user.sex,user.name from user"  
  32.   commitBatch: 3000  

9,重启adapter

  1. # ./bin/startup.sh  
  2. # tail -f logs/adapter/adapter.log  
canal 多数据库 多表

canal 多数据库 多表

如果启动没有报错,基本上就配置成功了,在mysql中测试一下吧。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2506.html