数据量比较大的情况下,elasticsearch单表操作要比mysql快很多,全文检索也比mysql快很多。试用了一下阿里的canal,感觉还不错。
一,安装java
# yum install java-1.8.0-openjdk-devel.x86_64
二,下载canal
下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2
# mkdir adapter deployer # tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C adapter # tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C deployer
不要直接解压,指定目录解压
三,创建测试mysql数据库和表
1,创建数据库和表
CREATE DATABASE `result_lianshan_saas` default CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE TABLE `test` ( `id` int(4) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', `ids` int(4) NOT NULL DEFAULT 0 COMMENT 'ID', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='tank test';
2,创建用户和分配权限
mysql> CREATE USER canal IDENTIFIED BY 'Canal-123'; Query OK, 0 rows affected (0.06 sec) mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; Query OK, 0 rows affected (0.04 sec) mysql> FLUSH PRIVILEGES; Query OK, 0 rows affected (0.10 sec)
3,开启binlog
# vim /etc/my.cnf [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 # systemctl restart mysqld
四,创建索引
# curl -XPUT "http://10.0.40.200:9200/result_lianshan_saas/?pretty" -H "Content-Type: application/json" -d'
{
"mappings" : {
"test": {
"_all":{
"enabled":false
},
"properties": {
"id": {
"type": "integer"
},
"ids": {
"type": "integer"
}
}
}
},
"settings" : {
"number_of_shards" : "3",
"number_of_replicas" : "1"
}
}'
索引名最好和数据库名一样,方便自己
五,配置Canal-server
# egrep -v "(^#|^$)" deployer/conf/example/instance.properties canal.instance.gtidon=false canal.instance.master.address=10.0.40.200:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=Canal-123 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=example canal.mq.partition=0 # ./deployer/bin/startup.sh # tail -f logs/canal/canal.log
日志很重要,如果有报错肯定是同步不了的
六,配置canal.adapter
1,修改application.yml
# egrep -v "(^#|^$)" adapter/conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
srcDataSources:
defaultDS:
url: jdbc:mysql://10.0.40.200:3306/result_lianshan_saas?useUnicode=true
username: canal
password: Canal-123
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es6 //根目录要对上,不然报错的,es6还是7,要根es的版本对上
hosts: 10.0.10.245:19200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: estestcluster
2,设置同步的表
# cat adapter/conf/es6/test.yml dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: result_lianshan_saas _type: test _id: _id sql: "select test.id as _id ,test.id,test.ids from test" commitBatch: 3000
删除默认的demo,索引名和type类型要和es的对上
3,启动canal.adapter
# ./bin/startup.sh # tail -f logs/adapter/adapter.log
日志很重要,如果有报错肯定是同步不了的
4,测试在mysql中插入
mysql> insert into result_lianshan_saas.test(ids)value(1);
在logs/adapter/adapter.log 日志会有以下内容
2021-02-20 10:15:47.032 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"ids":1}],"database":"result_lianshan_saas","destination":"example","es":1613787346000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1613787347032,"type":"INSERT"}
在es中就查看
# curl -XPOST "http://10.0.40.193:9200/result_lianshan_saas/test/_search?pretty" -H "Content-Type: application/json"
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "result_lianshan_saas",
"_type" : "test",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"ids" : 1
}
}
]
}
}
七,mysql添加字段同步
1,修改adapter/conf/es6/test.yml
# vim adapter/conf/es6/test.yml sql: "select test.id as _id ,test.id,test.ids,test.hao from test" //加字段
2,停止adapter/bin/stop.sh
3,删除deployer/conf/example/h2.mv.db
4,重启deployer/bin/restart.sh
5,启动adapter/bin/startup.sh
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/elasticsearch/2501.html