canal 同步mysql数据到clickhouse

张映 发表于 2021-09-07

分类目录: clickhouse

标签:, ,

clickhouse不支持自增ID,primary key可以重复。这对于习惯了mysql的人来说,肯定很不爽。如果写入到mysql,mysql同步数据到clickhouse,然后从clickhouse来读,这样就很完美了。

采用MaterializeMySQL引擎局限性比较大,同步的表数据只能在clickhouse集群中的某一台机器上,这样集群资源不能充分利用。

我想达到目的,在clickhouse中创建Distributed+ReplicatedMergeTree+zookeeper来实现分布式库和表,然后能过canal把mysql数据同步到过去。这样能充分利用系统资源,也能克服clickhouse的弊端。

一,安装canal

不详说了,参考:mysql 同步数据到 elasticsearch

配置mysql binlog时,要开启gtid_mode,多出以下内容

gtid_mode = on
enforce_gtid_consistency = on

二,配置nginx反代clickhouse

参考:配置nginx反代clickhouse

这样做的目的,让写入均衡的分布到各个分片中

三,mysql创建数据库,clickhouse创建数据

CREATE DATABASE `tanktest` DEFAULT CHARACTER SET utf8; //mysql
CREATE DATABASE tanktest ON CLUSTER clickhouse_test_netjoy; //clickhouse

四,配置canal deployer

# vim /root/canal/deployer/conf/canal.properties
canal.destinations = tanktest //默认是example,这里tanktest是目录,多个可以逗号隔开

# cd /root/canal/deployer/
# cp -rp conf/example conf/tanktest

# egrep -v "(^#|^$)" conf/tanktest/instance.properties
canal.instance.gtidon=false
canal.instance.master.address=10.0.40.200:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal-123
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=tanktest
canal.mq.partition=0

# ./bin/startup.sh
# tail -f logs/canal/canal.log

没报错说明deployer配置成功,tanktest是数据名,配置的目录也以数据库命名比较方便找

五,配置canal adapter

1,下载clickhouse-jdbc,httpclient,httpcore,lz4,lz4-java

下载地址:https://mvnrepository.com/

下载完成后,放到/root/canal/adapter/lib,我用的版本是

lz4-java-1.8.0.jar
lz4-1.5.0.jar
httpclient-4.5.13.jar
httpcore-4.4.13.jar
clickhouse-jdbc-0.2.6.jar

java.lang.NoClassDefFoundError:  org/apache/http/conn/HttpClientConnectionManager

缺少httpclient-4.5.13.jar,httpcore-4.4.13.jar

java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4Factory

缺少lz4-1.5.0.jar,lz4-java-1.8.0.jar

2,配置adapter/conf/application.yml

# egrep -v "(^#|^$)" adapter/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
      username: canal
      password: Canal-123
  canalAdapters:
  - instance: tanktest
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: mysql
        properties:
          jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
          jdbc.url: jdbc:clickhouse://127.0.0.1:8123/tanktest
          jdbc.username: default
          jdbc.password: 123456

注意127.0.0.1:8123,nginx反向代理后的

3,配置同步数据库

# cat /root/canal/adapter/conf/rdb/tanktest.yml
dataSourceKey: defaultDS
destination: tanktest
groupId: g1
outerAdapterKey: mysql
concurrent: true
dbMapping:
  mirrorDb: true
  database: tanktest

这是全库同步

dataSourceKey: defaultDS
destination: tanktest
groupId: g1
outerAdapterKey: mysql
concurrent: true
dbMapping:
  database: tanktest
  table: user
  targetTable: mytest2.user
  targetPk:
    id: id
  targetColumns:
    id:
    name:
    role_id:
    c_time:
    test1:
  etlCondition: "where c_time>={}"
  commitBatch: 3000

这是不同库名和表名同步,依自身情况定,canal这方面做的挺强大。

4,启动adapter

# ./bin/restart.sh
# tail -f logs/adapter/adapter.log 

2021-09-07 11:22:01.729 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2021-09-07 11:22:01.730 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2021-09-07 11:22:01.731 [main] INFO c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /root/canal/adapter/plugin
2021-09-07 11:22:01.760 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2021-09-07 11:22:01.764 [main] INFO c.a.otter.canal.client.adapter.rdb.config.ConfigLoader - ## Start loading rdb mapping config ...
2021-09-07 11:22:01.810 [main] INFO c.a.otter.canal.client.adapter.rdb.config.ConfigLoader - ## Rdb mapping config loaded
2021-09-07 11:22:01.812 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2021-09-07 11:22:02.066 [main] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-2} inited
2021-09-07 11:22:02.072 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: rdb succeed
2021-09-07 11:22:02.080 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /root/canal/adapter/plugin
2021-09-07 11:22:02.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: tanktest-g1 succeed
2021-09-07 11:22:02.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2021-09-07 11:22:02.108 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: tanktest <=============
2021-09-07 11:22:02.109 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2021-09-07 11:22:02.113 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2021-09-07 11:22:02.195 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2021-09-07 11:22:02.199 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.642 seconds (JVM running for 4.137)
2021-09-07 11:22:02.220 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: tanktest succeed <=============

出现这些就说明配置成功了

六,测试

1,mysql和clickhouse库和表都要提前建好

//mysql
CREATE TABLE `tank` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(30) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '姓名',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='测试';

//clickhouse
CREATE TABLE tanktest.tank ON CLUSTER clickhouse_test_netjoy
(
    `id` UInt8 DEFAULT 0 COMMENT 'id',
    `name` Nullable(String) COMMENT '名字',
    `create_time` DateTime DEFAULT toDateTime(now()) COMMENT '创建时间'
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/tanktest/tank', '{replica}')
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, create_time); 

CREATE TABLE tanktest.tank_all ON CLUSTER clickhouse_test_netjoy AS tanktest.tank
ENGINE = Distributed(clickhouse_test_netjoy, tanktest, tank, rand());

canal同步表结构时会报错

java.lang.RuntimeException: java.lang.RuntimeException: ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 62, host: 127.0.0.1, port: 8123; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 38 ('unsigned') (line 2, col 16): unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(30) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '姓名',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIME. Expected one of: NULL, DEFAULT, CODEC, ALIAS, NOT, MATERIALIZED, COMMENT, TTL, token, ClosingRoundBracket, Comma (version 20.8.3.18)

at com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter.sync(RdbAdapter.java:171) ~[na:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) ~[client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) ~[client-adapter.launcher-1.1.5.jar:na]
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895) ~[na:1.8.0_302]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) ~[client-adapter.launcher-1.1.5.jar:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_302]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_302]
Caused by: java.lang.RuntimeException: ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 62, host: 127.0.0.1, port: 8123; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 38 ('unsigned') (line 2, col 16): unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(30) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '姓名',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIME. Expected one of: NULL, DEFAULT, CODEC, ALIAS, NOT, MATERIALIZED, COMMENT, TTL, token, ClosingRoundBracket, Comma (version 20.8.3.18)

2,mysql插入数据

insert into tank (name,create_time) values ('tank1', '2021-08-21 14:21:30');
insert into tank (name,create_time) values ('tank2', '2021-08-22 14:21:30');
insert into tank (name,create_time) values ('tank3', '2021-08-23 14:21:30');
insert into tank (name,create_time) values ('tank4', '2021-08-24 14:21:30');

2021-09-07 12:06:25.747 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"tank1","create_time":1629526890000}],"database":"tanktest","destination":"tanktest","es":1630987585000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"tank","ts":1630987585688,"type":"INSERT"}
2021-09-07 12:06:25.747 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":2,"name":"tank2","create_time":1629613290000}],"database":"tanktest","destination":"tanktest","es":1630987585000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"tank","ts":1630987585744,"type":"INSERT"}
2021-09-07 12:06:25.748 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"name":"tank3","create_time":1629699690000}],"database":"tanktest","destination":"tanktest","es":1630987585000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"tank","ts":1630987585745,"type":"INSERT"}
2021-09-07 12:06:25.878 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"tank1","create_time":1629526890000},"database":"tanktest","destination":"tanktest","old":null,"table":"tank","type":"INSERT"}
2021-09-07 12:06:25.884 [pool-4-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"tank3","create_time":1629699690000},"database":"tanktest","destination":"tanktest","old":null,"table":"tank","type":"INSERT"}
2021-09-07 12:06:25.906 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":2,"name":"tank2","create_time":1629613290000},"database":"tanktest","destination":"tanktest","old":null,"table":"tank","type":"INSERT"}
2021-09-07 12:06:26.408 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":4,"name":"tank4","create_time":1629786090000}],"database":"tanktest","destination":"tanktest","es":1630987585000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"tank","ts":1630987586407,"type":"INSERT"}
2021-09-07 12:06:26.442 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":4,"name":"tank4","create_time":1629786090000},"database":"tanktest","destination":"tanktest","old":null,"table":"tank","type":"INSERT"}

但是删除和更新会报错

2021-09-07 17:16:46.853 [pool-7-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":2,"name":"tank2","create_time":1629613290000}],"database":"tanktest","destination":"tanktest","es":1631006206000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"tank","ts":1631006206761,"type":"DELETE"}

2021-09-07 17:16:46.883 [pool-7-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 62, host: 127.0.0.1, port: 8123; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 1 ('DELETE'): DELETE FROM `tanktest`.`tank` WHERE `id`=2 . Expected one of: Query, SHOW, SELECT, CREATE, DROP, TRUNCATE, OPTIMIZE TABLE, CREATE POLICY, ALTER POLICY, CREATE PROFILE, ALTER PROFILE, SYSTEM, DESCRIBE, DETACH, ATTACH, ALTER QUOTA, CREATE QUOTA, ALTER ROLE, CREATE ROLE, ALTER ROW POLICY, CREATE ROW POLICY, ALTER SETTINGS PROFILE, CREATE SETTINGS PROFILE, ALTER USER, CREATE USER, GRANT, REVOKE, SET ROLE, SET ROLE DEFAULT, SET DEFAULT ROLE, SHOW GRANTS, KILL, SELECT subquery, DESC, SET, WITH, list of elements, ALTER query, ALTER TABLE, ALTER LIVE VIEW, CREATE LIVE VIEW query, CREATE DATABASE query, CREATE VIEW query, CREATE DICTIONARY, GRANT or REVOKE query, Query with output, SHOW PROCESSLIST query, SHOW ACCESS query, SHOW PROCESSLIST, SHOW ACCESS, RENAME query, RENAME TABLE, EXCHANGE TABLES, RENAME DICTIONARY, RENAME DATABASE, SET query, ShowAccessEntitiesQuery, SHOW CREATE QUOTA query, SHOW CREATE, SHOW GRANTS query, SHOW PRIVILEGES query, SHOW PRIVILEGES, SYSTEM query, EXISTS or SHOW CREATE query, EXISTS, USE query, USE, WATCH query, WATCH, CHECK TABLE, DESCRIBE query, DROP access entity query, DROP query, EXPLAIN, EXTERNAL DDL query, EXTERNAL DDL FROM, INSERT query, INSERT INTO, KILL QUERY query, OPTIMIZE query, SELECT query, CREATE TABLE or ATTACH TABLE query, CREATE QUOTA or ALTER QUOTA query, CREATE ROLE or ALTER ROLE query, CREATE ROW POLICY or ALTER ROW POLICY query, CREATE SETTINGS PROFILE or ALTER SETTINGS PROFILE query, CREATE USER or ALTER USER query, SELECT query, possibly with UNION, SET ROLE or SET DEFAULT ROLE query, SHOW [TEMPORARY] TABLES|DATABASES|CLUSTERS|CLUSTER 'name' [[NOT] [I]LIKE 'str'] [LIMIT expr], SELECT query, subquery, possibly with UNION (version 20.8.3.18)

所以通过canal同步mysql到clickhouse也不是完美的。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2531.html