数据量比较大的情况下,elasticsearch单表操作要比mysql快很多,全文检索也比mysql快很多。试用了一下阿里的canal,感觉还不错。
一,安装java
# yum install java-1.8.0-openjdk-devel.x86_64
二,下载canal
下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2
# mkdir adapter deployer # tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C adapter # tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C deployer
不要直接解压,指定目录解压
三,创建测试mysql数据库和表
1,创建数据库和表
CREATE DATABASE `result_lianshan_saas` default CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE TABLE `test` ( `id` int(4) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `ids` int(4) NOT NULL DEFAULT 0 COMMENT 'ID', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='tank test';
2,创建用户和分配权限
mysql> CREATE USER canal IDENTIFIED BY 'Canal-123'; Query OK, 0 rows affected (0.06 sec) mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; Query OK, 0 rows affected (0.04 sec) mysql> FLUSH PRIVILEGES; Query OK, 0 rows affected (0.10 sec)
3,开启binlog
# vim /etc/my.cnf [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 # systemctl restart mysqld
四,创建索引
# curl -XPUT "http://10.0.40.200:9200/result_lianshan_saas/?pretty" -H "Content-Type: application/json" -d' { "mappings" : { "test": { "_all":{ "enabled":false }, "properties": { "id": { "type": "integer" }, "ids": { "type": "integer" } } } }, "settings" : { "number_of_shards" : "3", "number_of_replicas" : "1" } }'
索引名最好和数据库名一样,方便自己
五,配置Canal-server
# egrep -v "(^#|^$)" deployer/conf/example/instance.properties canal.instance.gtidon=false canal.instance.master.address=10.0.40.200:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=example canal.mq.partition=0 # ./deployer/bin/startup.sh # tail -f logs/canal/canal.log
日志很重要,如果有报错肯定是同步不了的
六,配置canal.adapter
1,修改application.yml
# egrep -v "(^#|^$)" adapter/conf/application.yml server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer srcDataSources: defaultDS: url: jdbc:mysql://10.0.40.200:3306/result_lianshan_saas?useUnicode=true username: canal password: Canal-123 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es6 //根目录要对上,不然报错的,es6还是7,要根es的版本对上 hosts: 10.0.10.245:19200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest # security.auth: test:123456 # only used for rest mode cluster.name: estestcluster
2,设置同步的表
# cat adapter/conf/es6/test.yml dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: result_lianshan_saas _type: test _id: _id sql: "select test.id as _id ,test.id,test.ids from test" commitBatch: 3000
删除默认的demo,索引名和type类型要和es的对上
3,启动canal.adapter
# ./bin/startup.sh # tail -f logs/adapter/adapter.log
日志很重要,如果有报错肯定是同步不了的
4,测试在mysql中插入
mysql> insert into result_lianshan_saas.test(ids)value(1);
在logs/adapter/adapter.log 日志会有以下内容
2021-02-20 10:15:47.032 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"ids":1}],"database":"result_lianshan_saas","destination":"example","es":1613787346000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1613787347032,"type":"INSERT"}
在es中就查看
# curl -XPOST "http://10.0.40.193:9200/result_lianshan_saas/test/_search?pretty" -H "Content-Type: application/json" { "took" : 3, "timed_out" : false, "_shards" : { "total" : 3, "successful" : 3, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : "result_lianshan_saas", "_type" : "test", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "ids" : 1 } } ] } }
七,mysql添加字段同步
1,修改adapter/conf/es6/test.yml
# vim adapter/conf/es6/test.yml sql: "select test.id as _id ,test.id,test.ids,test.hao from test" //加字段
2,停止adapter/bin/stop.sh
3,删除deployer/conf/example/h2.mv.db
4,重启deployer/bin/restart.sh
5,启动adapter/bin/startup.sh
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2501.html